MONGO_FAIL_POINT_DEFINE(rollbackHangBeforeFinish); // Failpoint which causes rollback to hang and then fail after minValid is written. MONGO_FAIL_POINT_DEFINE(rollbackHangThenFailAfterWritingMinValid); namespace { constexpr int kMaxConnectionAttempts = 3; OpTime getOpTime(const OplogInterface::Iterator::Value& oplogValue) { return fassert(40298, OpTime::parseFromOplogEntry(oplogValue.first)); } long long getTerm(const BSONObj& operation) { return operation["t"].numberLong(); } Timestamp getTimestamp(const BSONObj& operation) { return operation["ts"].timestamp(); } Timestamp getTimestamp(const OplogInterface::Iterator::Value& oplogValue) { return getTimestamp(oplogValue.first); } long long getTerm(const OplogInterface::Iterator::Value& oplogValue) { return getTerm(oplogValue.first); } } // namespace RollBackLocalOperations::RollBackLocalOperations(const OplogInterface& localOplog, const RollbackOperationFn& rollbackOperation) : _localOplogIterator(localOplog.makeIterator()), _rollbackOperation(rollbackOperation), _scanned(0) { uassert(ErrorCodes::BadValue, "invalid local oplog iterator", _localOplogIterator); uassert(ErrorCodes::BadValue, "null roll back operation function", rollbackOperation); } RollBackLocalOperations::RollbackCommonPoint::RollbackCommonPoint(BSONObj oplogBSON, RecordId recordId, BSONObj nextOplogBSON) : _recordId(std::move(recordId)) { auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); _opTime = oplogEntry.getOpTime(); _wallClockTime = oplogEntry.getWallClockTime(); // nextOplogEntry holds the oplog entry immediately after the common point. auto nextOplogEntry = uassertStatusOK(repl::OplogEntry::parse(nextOplogBSON)); _firstWallClockTimeAfterCommonPoint = nextOplogEntry.getWallClockTime(); } StatusWith RollBackLocalOperations::onRemoteOperation( const BSONObj& operation) { if (_scanned == 0) { auto result = _localOplogIterator->next(); if (!result.isOK()) { return Status(ErrorCodes::OplogStartMissing, "no oplog during rollback"); } _localOplogValue = result.getValue(); } // As we iterate through the oplog in reverse, opAfterCurrentEntry holds the oplog entry // immediately after the entry stored in _localOplogValue. BSONObj opAfterCurrentEntry = _localOplogValue.first; while (getTimestamp(_localOplogValue) > getTimestamp(operation)) { _scanned++; LOG(2) << "Local oplog entry to roll back: " << redact(_localOplogValue.first); auto status = _rollbackOperation(_localOplogValue.first); if (!status.isOK()) { invariant(ErrorCodes::NoSuchKey != status.code()); return status; } auto result = _localOplogIterator->next(); if (!result.isOK()) { return Status(ErrorCodes::NoMatchingDocument, str::stream() << "reached beginning of local oplog: {" << "scanned: " << _scanned << ", theirTime: " << getTimestamp(operation).toString() << ", ourTime: " << getTimestamp(_localOplogValue).toString() << "}"); } opAfterCurrentEntry = _localOplogValue.first; _localOplogValue = result.getValue(); } if (getTimestamp(_localOplogValue) == getTimestamp(operation)) { _scanned++; if (getTerm(_localOplogValue) == getTerm(operation)) { return RollbackCommonPoint( _localOplogValue.first, _localOplogValue.second, opAfterCurrentEntry); } // We don't need to advance the localOplogIterator here because it is guaranteed to advance // during the next call to onRemoteOperation. This is because before the next call to // onRemoteOperation, the remote oplog iterator will advance and the new remote operation is // guaranteed to have a timestamp less than the current local operation, which will trigger // a call to get the next local operation. return Status(ErrorCodes::NoSuchKey, "Unable to determine common point - same timestamp but different terms. " "Need to process additional remote operations."); } invariant(getTimestamp(_localOplogValue) < getTimestamp(operation)); _scanned++; return Status(ErrorCodes::NoSuchKey, "Unable to determine common point. " "Need to process additional remote operations."); } StatusWith syncRollBackLocalOperations( const OplogInterface& localOplog, const OplogInterface& remoteOplog, const RollBackLocalOperations::RollbackOperationFn& rollbackOperation) { std::unique_ptr remoteIterator; // Retry in case of network errors. for (int attemptsLeft = kMaxConnectionAttempts - 1; attemptsLeft >= 0; attemptsLeft--) { try { remoteIterator = remoteOplog.makeIterator(); } catch (DBException&) { if (attemptsLeft == 0) { throw; } } } invariant(remoteIterator); auto remoteResult = remoteIterator->next(); if (!remoteResult.isOK()) { return Status(ErrorCodes::InvalidSyncSource, remoteResult.getStatus().reason()) .withContext("remote oplog empty or unreadable"); } RollBackLocalOperations finder(localOplog, rollbackOperation); Timestamp theirTime; while (remoteResult.isOK()) { BSONObj theirObj = remoteResult.getValue().first; theirTime = theirObj["ts"].timestamp(); auto result = finder.onRemoteOperation(theirObj); if (result.isOK()) { return result.getValue(); } else if (result.getStatus().code() != ErrorCodes::NoSuchKey) { return result; } remoteResult = remoteIterator->next(); } return Status(ErrorCodes::NoMatchingDocument, str::stream() << "reached beginning of remote oplog: {" << "them: " << remoteOplog.toString() << ", theirTime: " << theirTime.toString() << "}"); } } // namespace repl } // namespace mongo