diff options
author | Benety Goh <benety@mongodb.com> | 2016-11-17 12:05:26 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-12-06 11:41:52 -0500 |
commit | e30e39ce1a4a55c46db13ad85f6c1000297ea6ff (patch) | |
tree | 4125e5eee3f765fbd860338e6898f2345dd5fa32 | |
parent | 726cafd713c7333640f8458ec9808ed4f678e3a7 (diff) | |
download | mongo-e30e39ce1a4a55c46db13ad85f6c1000297ea6ff.tar.gz |
SERVER-27052 added asynchronous operation support to DataReplicator
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 1823 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 541 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_mock.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_mock.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 3813 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync_state.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 92 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_fixture.cpp | 5 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_fixture.h | 7 |
10 files changed, 4422 insertions, 1879 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d197a03e068..b11f2900c45 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1047,6 +1047,7 @@ env.Library( 'databases_cloner', 'multiapplier', 'oplog_buffer_blocking_queue', + 'oplog_entry', 'oplog_fetcher', 'optime', 'rollback_checker', diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 79cd4f1d4ac..6e22361a270 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -33,10 +33,12 @@ #include "data_replicator.h" #include <algorithm> +#include <utility> #include "mongo/base/counter.h" #include "mongo/base/status.h" #include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/bson/util/bson_extract.h" #include "mongo/client/fetcher.h" #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/commands/server_status_metric.h" @@ -49,7 +51,6 @@ #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" -#include "mongo/db/repl/rollback_checker.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/sync_source_selector.h" #include "mongo/db/server_parameters.h" @@ -103,9 +104,21 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncConnectAttempts, int, 10); // The number of attempts to call find on the remote oplog. MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncOplogFindAttempts, int, 3); +// The number of initial sync attempts that have failed since server startup. Each instance of +// DataReplicator may run multiple attempts to fulfill an initial sync request that is triggered +// when DataReplicator::startup() is called. Counter64 initialSyncFailedAttempts; + +// The number of initial sync requests that have been requested and failed. Each instance of +// DataReplicator (upon successful startup()) corresponds to a single initial sync request. +// This value does not include the number of times where a DataReplicator is created successfully +// but failed in startup(). Counter64 initialSyncFailures; + +// The number of initial sync requests that have been requested and completed successfully. Each +// instance of DataReplicator corresponds to a single initial sync request. Counter64 initialSyncCompletes; + ServerStatusMetricField<Counter64> displaySSInitialSyncFailedAttempts( "repl.initialSync.failedAttempts", &initialSyncFailedAttempts); ServerStatusMetricField<Counter64> displaySSInitialSyncFailures("repl.initialSync.failures", @@ -117,20 +130,6 @@ ServiceContext::UniqueOperationContext makeOpCtx() { return cc().makeOperationContext(); } -StatusWith<TaskExecutor::CallbackHandle> scheduleWork( - TaskExecutor* exec, - stdx::function<void(OperationContext* txn, const CallbackArgs& cbData)> func) { - - // Wrap 'func' with a lambda that checks for cancallation and creates an OperationContext*. - return exec->scheduleWork([func](const CallbackArgs& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - auto txn = makeOpCtx(); - func(txn.get(), cbData); - }); -} - StatusWith<Timestamp> parseTimestampStatus(const QueryResponseStatus& fetchResult) { if (!fetchResult.isOK()) { return fetchResult.getStatus(); @@ -189,7 +188,12 @@ StatusWith<BSONObj> getLatestOplogEntry(executor::TaskExecutor* exec, } StatusWith<OpTimeWithHash> parseOpTimeWithHash(const BSONObj& oplogEntry) { - auto oplogEntryHash = oplogEntry["h"].Long(); + long long oplogEntryHash = 0LL; + auto status = bsonExtractIntegerField(oplogEntry, "h", &oplogEntryHash); + if (!status.isOK()) { + return status; + } + const auto lastOpTime = OpTime::parseFromOplogEntry(oplogEntry); if (!lastOpTime.isOK()) { return lastOpTime.getStatus(); @@ -206,96 +210,140 @@ StatusWith<OpTimeWithHash> parseOpTimeWithHash(const QueryResponseStatus& fetchR const auto hasDoc = docs.begin() != docs.end(); return hasDoc ? parseOpTimeWithHash(docs.front()) - : StatusWith<OpTimeWithHash>{ErrorCodes::NoMatchingDocument, "No document in batch."}; -} - -Timestamp findCommonPoint(HostAndPort host, Timestamp start) { - // TODO: walk back in the oplog looking for a known/shared optime. - return Timestamp(); -} - -template <typename T> -void swapAndJoin_inlock(UniqueLock* lock, T& uniquePtrToReset, const char* msg) { - if (!uniquePtrToReset) { - return; - } - T tempPtr = std::move(uniquePtrToReset); - lock->unlock(); - LOG(1) << msg << tempPtr->toString(); - tempPtr->join(); - lock->lock(); + : StatusWith<OpTimeWithHash>{ErrorCodes::NoMatchingDocument, "no oplog entry found"}; } } // namespace -std::string toString(DataReplicatorState s) { - switch (s) { - case DataReplicatorState::InitialSync: - return "InitialSync"; - case DataReplicatorState::Uninitialized: - return "Uninitialized"; - } - MONGO_UNREACHABLE; -} - // Data Replicator DataReplicator::DataReplicator( DataReplicatorOptions opts, std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState, - StorageInterface* storage) + StorageInterface* storage, + const OnCompletionFn& onCompletion) : _fetchCount(0), _opts(opts), _dataReplicatorExternalState(std::move(dataReplicatorExternalState)), _exec(_dataReplicatorExternalState->getTaskExecutor()), - _dataReplicatorState(DataReplicatorState::Uninitialized), - _storage(storage) { + _storage(storage), + _onCompletion(onCompletion) { + uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec); uassert(ErrorCodes::BadValue, "invalid storage interface", _storage); uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime); uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime); uassert(ErrorCodes::BadValue, "invalid getSlaveDelay function", _opts.getSlaveDelay); uassert(ErrorCodes::BadValue, "invalid sync source selector", _opts.syncSourceSelector); + uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion); } DataReplicator::~DataReplicator() { DESTRUCTOR_GUARD({ - UniqueLock lk(_mutex); - _cancelAllHandles_inlock(); - _waitOnAndResetAll_inlock(&lk); + shutdown(); + join(); }); } +bool DataReplicator::isActive() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _isActive_inlock(); +} + +bool DataReplicator::_isActive_inlock() const { + return State::kRunning == _state || State::kShuttingDown == _state; +} + +Status DataReplicator::startup(OperationContext* txn, + std::uint32_t initialSyncMaxAttempts) noexcept { + invariant(txn); + invariant(initialSyncMaxAttempts >= 1U); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + switch (_state) { + case State::kPreStart: + _state = State::kRunning; + break; + case State::kRunning: + return Status(ErrorCodes::IllegalOperation, "data replicator already started"); + case State::kShuttingDown: + return Status(ErrorCodes::ShutdownInProgress, "data replicator shutting down"); + case State::kComplete: + return Status(ErrorCodes::ShutdownInProgress, "data replicator completed"); + } + + _setUp_inlock(txn, initialSyncMaxAttempts); + + // Start first initial sync attempt. + std::uint32_t initialSyncAttempt = 0; + auto status = _scheduleWorkAndSaveHandle_inlock( + stdx::bind(&DataReplicator::_startInitialSyncAttemptCallback, + this, + stdx::placeholders::_1, + initialSyncAttempt, + initialSyncMaxAttempts), + &_startInitialSyncAttemptHandle, + str::stream() << "_startInitialSyncAttemptCallback-" << initialSyncAttempt); + + if (!status.isOK()) { + _state = State::kComplete; + return status; + } + + return Status::OK(); +} + Status DataReplicator::shutdown() { - auto status = scheduleShutdown(); - if (status.isOK()) { - log() << "Waiting for shutdown of DataReplicator."; - waitForShutdown(); + stdx::lock_guard<stdx::mutex> lock(_mutex); + switch (_state) { + case State::kPreStart: + // Transition directly from PreStart to Complete if not started yet. + _state = State::kComplete; + return Status::OK(); + case State::kRunning: + _state = State::kShuttingDown; + break; + case State::kShuttingDown: + case State::kComplete: + // Nothing to do if we are already in ShuttingDown or Complete state. + return Status::OK(); } - return status; + + _cancelRemainingWork_inlock(); + + return Status::OK(); } -DataReplicatorState DataReplicator::getState() const { - LockGuard lk(_mutex); - return _dataReplicatorState; +void DataReplicator::_cancelRemainingWork_inlock() { + _cancelHandle_inlock(_startInitialSyncAttemptHandle); + _cancelHandle_inlock(_chooseSyncSourceHandle); + _cancelHandle_inlock(_getBaseRollbackIdHandle); + _cancelHandle_inlock(_getLastRollbackIdHandle); + _cancelHandle_inlock(_getNextApplierBatchHandle); + + _shutdownComponent_inlock(_oplogFetcher); + if (_initialSyncState) { + _shutdownComponent_inlock(_initialSyncState->dbsCloner); + } + _shutdownComponent_inlock(_applier); + _shutdownComponent_inlock(_lastOplogEntryFetcher); } -HostAndPort DataReplicator::getSyncSource() const { - LockGuard lk(_mutex); - return _syncSource; +void DataReplicator::join() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); }); } -OpTimeWithHash DataReplicator::getLastFetched() const { - LockGuard lk(_mutex); - return _lastFetched; +DataReplicator::State DataReplicator::getState_forTest() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _state; } -OpTimeWithHash DataReplicator::getLastApplied() const { - LockGuard lk(_mutex); - return _lastApplied; +bool DataReplicator::_isShuttingDown() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _isShuttingDown_inlock(); } -size_t DataReplicator::getOplogBufferCount() const { - // Oplog buffer is internally synchronized. - return _oplogBuffer->getCount(); +bool DataReplicator::_isShuttingDown_inlock() const { + return State::kShuttingDown == _state; } std::string DataReplicator::getDiagnosticString() const { @@ -303,11 +351,10 @@ std::string DataReplicator::getDiagnosticString() const { str::stream out; out << "DataReplicator -" << " opts: " << _opts.toString() << " oplogFetcher: " << _oplogFetcher->toString() - << " opsBuffered: " << _oplogBuffer->getSize() - << " state: " << toString(_dataReplicatorState); + << " opsBuffered: " << _oplogBuffer->getSize() << " active: " << _isActive_inlock() + << " shutting down: " << _isShuttingDown_inlock(); if (_initialSyncState) { - out << " opsAppied: " << _initialSyncState->appliedOps - << " status: " << _initialSyncState->status.toString(); + out << " opsAppied: " << _initialSyncState->appliedOps; } return out; @@ -345,445 +392,789 @@ BSONObj DataReplicator::_getInitialSyncProgress_inlock() const { return bob.obj(); } -void DataReplicator::_resetState_inlock(OperationContext* txn, OpTimeWithHash lastAppliedOpTime) { - invariant(!_anyActiveHandles_inlock()); - _lastApplied = _lastFetched = lastAppliedOpTime; - if (_oplogBuffer) { - _oplogBuffer->clear(txn); - } -} - void DataReplicator::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) { LockGuard lk(_mutex); _scheduleDbWorkFn = work; } -Status DataReplicator::_runInitialSyncAttempt_inlock(OperationContext* txn, - UniqueLock& lk, - HostAndPort syncSource) { - RollbackChecker rollbackChecker(_exec, syncSource); - invariant(lk.owns_lock()); - Status statusFromWrites(ErrorCodes::NotYetInitialized, "About to run Initial Sync Attempt."); +void DataReplicator::_setUp_inlock(OperationContext* txn, std::uint32_t initialSyncMaxAttempts) { + // This will call through to the storageInterfaceImpl to ReplicationCoordinatorImpl. + // 'txn' is passed through from startup(). + _storage->setInitialSyncFlag(txn); + + LOG(1) << "Creating oplogBuffer."; + _oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer(txn); + _oplogBuffer->startup(txn); + + _stats.initialSyncStart = _exec->now(); + _stats.maxFailedInitialSyncAttempts = initialSyncMaxAttempts; + _stats.failedInitialSyncAttempts = 0; +} + +void DataReplicator::_tearDown_inlock(OperationContext* txn, + const StatusWith<OpTimeWithHash>& lastApplied) { + _stats.initialSyncEnd = _exec->now(); + + // This might not be necessary if we failed initial sync. + invariant(_oplogBuffer); + _oplogBuffer->shutdown(txn); + + if (!lastApplied.isOK()) { + return; + } + _storage->clearInitialSyncFlag(txn); + _opts.setMyLastOptime(lastApplied.getValue().opTime); + log() << "initial sync done; took " + << duration_cast<Seconds>(_stats.initialSyncEnd - _stats.initialSyncStart) << "."; + initialSyncCompletes.increment(); +} + +void DataReplicator::_startInitialSyncAttemptCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::uint32_t initialSyncAttempt, + std::uint32_t initialSyncMaxAttempts) { + auto status = _checkForShutdownAndConvertStatus_inlock( + callbackArgs, + str::stream() << "error while starting initial sync attempt " << (initialSyncAttempt + 1) + << " of " + << initialSyncMaxAttempts); + if (!status.isOK()) { + _finishInitialSyncAttempt(status); + return; + } + + log() << "Starting initial sync (attempt " << (initialSyncAttempt + 1) << " of " + << initialSyncMaxAttempts << ")"; + + // This completion guard invokes _finishInitialSyncAttempt on destruction. + auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; + auto finishInitialSyncAttemptFn = [this](const StatusWith<OpTimeWithHash>& lastApplied) { + _finishInitialSyncAttempt(lastApplied); + }; + auto onCompletionGuard = + std::make_shared<OnCompletionGuard>(cancelRemainingWorkInLock, finishInitialSyncAttemptFn); + + // Lock guard must be declared after completion guard because completion guard destructor + // has to run outside lock. + stdx::lock_guard<stdx::mutex> lock(_mutex); + + LOG(2) << "Resetting sync source so a new one can be chosen for this initial sync attempt."; + _syncSource = HostAndPort(); + + _lastApplied = {}; + _lastFetched = {}; + _oplogBuffer->clear(makeOpCtx().get()); + + // Get sync source. + std::uint32_t chooseSyncSourceAttempt = 0; + std::uint32_t chooseSyncSourceMaxAttempts = + static_cast<std::uint32_t>(numInitialSyncConnectAttempts.load()); + + // _scheduleWorkAndSaveHandle_inlock() is shutdown-aware. + status = _scheduleWorkAndSaveHandle_inlock( + stdx::bind(&DataReplicator::_chooseSyncSourceCallback, + this, + stdx::placeholders::_1, + chooseSyncSourceAttempt, + chooseSyncSourceMaxAttempts, + onCompletionGuard), + &_chooseSyncSourceHandle, + str::stream() << "_chooseSyncSourceCallback-" << chooseSyncSourceAttempt); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} + +void DataReplicator::_chooseSyncSourceCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::uint32_t chooseSyncSourceAttempt, + std::uint32_t chooseSyncSourceMaxAttempts, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + // Cancellation should be treated the same as other errors. In this case, the most likely cause + // of a failed _chooseSyncSourceCallback() task is a cancellation triggered by + // DataReplicator::shutdown() or the task executor shutting down. + auto status = + _checkForShutdownAndConvertStatus_inlock(callbackArgs, "error while choosing sync source"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + if (MONGO_FAIL_POINT(failInitialSyncWithBadHost)) { + status = Status(ErrorCodes::InvalidSyncSource, + "no sync source avail(failInitialSyncWithBadHost failpoint is set)."); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + auto syncSource = _chooseSyncSource_inlock(); + if (!syncSource.isOK()) { + if (chooseSyncSourceAttempt + 1 >= chooseSyncSourceMaxAttempts) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, + Status(ErrorCodes::InitialSyncOplogSourceMissing, + "No valid sync source found in current replica set to do an initial sync.")); + return; + } + + auto when = _exec->now() + _opts.syncSourceRetryWait; + LOG(1) << "Error getting sync source: '" << syncSource.getStatus() << "', trying again in " + << _opts.syncSourceRetryWait << " at " << when.toString() << ". Attempt " + << (chooseSyncSourceAttempt + 1) << " of " << numInitialSyncConnectAttempts.load(); + auto status = _scheduleWorkAtAndSaveHandle_inlock( + when, + stdx::bind(&DataReplicator::_chooseSyncSourceCallback, + this, + stdx::placeholders::_1, + chooseSyncSourceAttempt + 1, + chooseSyncSourceMaxAttempts, + onCompletionGuard), + &_chooseSyncSourceHandle, + str::stream() << "_chooseSyncSourceCallback-" << (chooseSyncSourceAttempt + 1)); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + return; + } + + // There is no need to schedule separate task to create oplog collection since we are already in + // a callback and we are certain there's no existing operation context (required for creating + // collections and dropping user databases) attached to the current thread. + status = _recreateOplogAndDropReplicatedDatabases(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // Schedule rollback ID checker. + _syncSource = syncSource.getValue(); + _rollbackChecker = stdx::make_unique<RollbackChecker>(_exec, _syncSource); + auto scheduleResult = + _rollbackChecker->reset(stdx::bind(&DataReplicator::_rollbackCheckerResetCallback, + this, + stdx::placeholders::_1, + onCompletionGuard)); + status = scheduleResult.getStatus(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + _getBaseRollbackIdHandle = scheduleResult.getValue(); +} + +Status DataReplicator::_recreateOplogAndDropReplicatedDatabases() { // drop/create oplog; drop user databases. LOG(1) << "About to drop+create the oplog, if it exists, ns:" << _opts.localOplogNS << ", and drop all user databases (so that we can clone them)."; - const auto schedStatus = scheduleWork( - _exec, [&statusFromWrites, this](OperationContext* txn, const CallbackArgs& cd) { - /** - * This functions does the following: - * 1.) Drop oplog - * 2.) Drop user databases (replicated dbs) - * 3.) Create oplog - */ - if (!cd.status.isOK()) { - error() << "Error while being called to drop/create oplog and drop users " - << "databases, oplogNS: " << _opts.localOplogNS - // REDACT cd?? - << " with status:" << cd.status.toString(); - statusFromWrites = cd.status; - return; - } - invariant(txn); - // We are not replicating nor validating these writes. - txn->setReplicatedWrites(false); + auto txn = makeOpCtx(); - // 1.) Drop the oplog. - LOG(2) << "Dropping the existing oplog: " << _opts.localOplogNS; - statusFromWrites = _storage->dropCollection(txn, _opts.localOplogNS); + // We are not replicating nor validating these writes. + UnreplicatedWritesBlock unreplicatedWritesBlock(txn.get()); + // 1.) Drop the oplog. + LOG(2) << "Dropping the existing oplog: " << _opts.localOplogNS; + auto status = _storage->dropCollection(txn.get(), _opts.localOplogNS); + if (!status.isOK()) { + return status; + } - // 2.) Drop user databases. - if (statusFromWrites.isOK()) { - LOG(2) << "Dropping user databases"; - statusFromWrites = _storage->dropReplicatedDatabases(txn); - } + // 2.) Drop user databases. + LOG(2) << "Dropping user databases"; + status = _storage->dropReplicatedDatabases(txn.get()); + if (!status.isOK()) { + return status; + } - // 3.) Create the oplog. - if (statusFromWrites.isOK()) { - LOG(2) << "Creating the oplog: " << _opts.localOplogNS; - statusFromWrites = _storage->createOplog(txn, _opts.localOplogNS); - } + // 3.) Create the oplog. + LOG(2) << "Creating the oplog: " << _opts.localOplogNS; + return _storage->createOplog(txn.get(), _opts.localOplogNS); +} - }); +void DataReplicator::_rollbackCheckerResetCallback( + const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(), + "error while getting base rollback ID"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } - if (!schedStatus.isOK()) - return schedStatus.getStatus(); - - lk.unlock(); - _exec->wait(schedStatus.getValue()); - if (!statusFromWrites.isOK()) { - lk.lock(); - return statusFromWrites; - } - - auto rollbackStatus = rollbackChecker.reset_sync(); - lk.lock(); - if (!rollbackStatus.isOK()) - return rollbackStatus; - - Event initialSyncFinishEvent; - StatusWith<Event> eventStatus = _exec->makeEvent(); - if (!eventStatus.isOK()) { - return eventStatus.getStatus(); - } - initialSyncFinishEvent = eventStatus.getValue(); - - if (_inShutdown) { - // Signal shutdown event. - _doNextActions_inlock(); - return Status(ErrorCodes::ShutdownInProgress, - "initial sync terminated before creating cloner"); - } - - invariant(initialSyncFinishEvent.isValid()); - _initialSyncState.reset(new InitialSyncState( - stdx::make_unique<DatabasesCloner>( - _storage, - _exec, - _dataReplicatorExternalState->getDbWorkThreadPool(), - syncSource, - [](BSONObj dbInfo) { - const std::string name = dbInfo["name"].str(); - return (name != "local"); - }, - stdx::bind( - &DataReplicator::_onDataClonerFinish, this, stdx::placeholders::_1, syncSource)), - initialSyncFinishEvent)); - - const NamespaceString ns(_opts.remoteOplogNS); - lk.unlock(); - // get the latest oplog entry, and parse out the optime + hash. - const auto lastOplogEntry = getLatestOplogEntry(_exec, syncSource, ns); - const auto lastOplogEntryOpTimeWithHashStatus = lastOplogEntry.isOK() - ? parseOpTimeWithHash(lastOplogEntry.getValue()) - : StatusWith<OpTimeWithHash>{lastOplogEntry.getStatus()}; - - lk.lock(); - - if (!lastOplogEntryOpTimeWithHashStatus.isOK()) { - _initialSyncState->status = lastOplogEntryOpTimeWithHashStatus.getStatus(); - return _initialSyncState->status; - } - - _initialSyncState->oplogSeedDoc = lastOplogEntry.getValue().getOwned(); - const auto lastOpTimeWithHash = lastOplogEntryOpTimeWithHashStatus.getValue(); - _initialSyncState->beginTimestamp = lastOpTimeWithHash.opTime.getTimestamp(); + status = _scheduleLastOplogEntryFetcher_inlock( + stdx::bind(&DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp, + this, + stdx::placeholders::_1, + onCompletionGuard)); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} + +void DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp( + const StatusWith<Fetcher::QueryResponse>& result, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + result.getStatus(), "error while getting last oplog entry for begin timestamp"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } - if (_oplogFetcher) { - if (_oplogFetcher->isActive()) { - LOG(3) << "Fetcher is active, stopping it."; - _oplogFetcher->shutdown(); + const auto opTimeWithHashResult = parseOpTimeWithHash(result); + status = opTimeWithHashResult.getStatus(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // This is where the flow of control starts to split into two parallel tracks: + // - oplog fetcher + // - data cloning and applier + auto listDatabasesFilter = [](BSONObj dbInfo) { + std::string name; + auto status = mongo::bsonExtractStringField(dbInfo, "name", &name); + if (!status.isOK()) { + error() << "listDatabases filter failed to parse database name from " << redact(dbInfo) + << ": " << redact(status); + return false; } + return (name != "local"); + }; + _initialSyncState = stdx::make_unique<InitialSyncState>( + stdx::make_unique<DatabasesCloner>(_storage, + _exec, + _dataReplicatorExternalState->getDbWorkThreadPool(), + _syncSource, + listDatabasesFilter, + stdx::bind(&DataReplicator::_databasesClonerCallback, + this, + stdx::placeholders::_1, + onCompletionGuard))); + + const auto& lastOpTimeWithHash = opTimeWithHashResult.getValue(); + _initialSyncState->beginTimestamp = lastOpTimeWithHash.opTime.getTimestamp(); + + invariant(!result.getValue().documents.empty()); + LOG(2) << "Setting begin timestamp to " << _initialSyncState->beginTimestamp + << " using last oplog entry: " << redact(result.getValue().documents.front()) + << ", ns: " << _opts.localOplogNS; + + + const auto configResult = _dataReplicatorExternalState->getCurrentConfig(); + status = configResult.getStatus(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + _initialSyncState.reset(); + return; } - _oplogFetcher.reset(); - - const auto config = uassertStatusOK(_dataReplicatorExternalState->getCurrentConfig()); - _oplogFetcher = stdx::make_unique<OplogFetcher>(_exec, - lastOpTimeWithHash, - syncSource, - _opts.remoteOplogNS, - config, - _opts.oplogFetcherMaxFetcherRestarts, - _dataReplicatorExternalState.get(), - stdx::bind(&DataReplicator::_enqueueDocuments, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3), - stdx::bind(&DataReplicator::_onOplogFetchFinish, - this, - stdx::placeholders::_1, - stdx::placeholders::_2)); + + const auto& config = configResult.getValue(); + _oplogFetcher = + stdx::make_unique<OplogFetcher>(_exec, + lastOpTimeWithHash, + _syncSource, + _opts.remoteOplogNS, + config, + _opts.oplogFetcherMaxFetcherRestarts, + _dataReplicatorExternalState.get(), + stdx::bind(&DataReplicator::_enqueueDocuments, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3), + stdx::bind(&DataReplicator::_oplogFetcherCallback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + onCompletionGuard)); + LOG(2) << "Starting OplogFetcher: " << _oplogFetcher->toString(); - auto oplogFetcherStartupStatus = _oplogFetcher->startup(); - if (!oplogFetcherStartupStatus.isOK()) { - return oplogFetcherStartupStatus; - } - DatabasesCloner* cloner = _initialSyncState->dbsCloner.get(); - if (_scheduleDbWorkFn) { - cloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn); + // _startupComponent_inlock is shutdown-aware. + status = _startupComponent_inlock(_oplogFetcher); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + _initialSyncState->dbsCloner.reset(); + return; } - lk.unlock(); if (MONGO_FAIL_POINT(initialSyncHangBeforeCopyingDatabases)) { + lock.unlock(); + // This could have been done with a scheduleWorkAt but this is used only by JS tests where + // we run with multiple threads so it's fine to spin on this thread. // This log output is used in js tests so please leave it. log() << "initial sync - initialSyncHangBeforeCopyingDatabases fail point " "enabled. Blocking until fail point is disabled."; - while (MONGO_FAIL_POINT(initialSyncHangBeforeCopyingDatabases)) { - lk.lock(); - if (!_initialSyncState->status.isOK()) { - lk.unlock(); - break; - } - lk.unlock(); + while (MONGO_FAIL_POINT(initialSyncHangBeforeCopyingDatabases) && !_isShuttingDown()) { mongo::sleepsecs(1); } + lock.lock(); } - auto clonerStartupStatus = cloner->startup(); // When the cloner is done applier starts. - if (!clonerStartupStatus.isOK()) { - return clonerStartupStatus; + if (_scheduleDbWorkFn) { + // '_scheduleDbWorkFn' is passed through (DatabasesCloner->DatabaseCloner->CollectionCloner) + // to the CollectionCloner so that CollectionCloner's default TaskRunner can be disabled to + // facilitate testing. + _initialSyncState->dbsCloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn); } - _exec->waitForEvent(initialSyncFinishEvent); + LOG(2) << "Starting DatabasesCloner: " << _initialSyncState->dbsCloner->toString(); - log() << "Initial sync attempt finishing up."; - lk.lock(); - if (!_initialSyncState->status.isOK()) { - return _initialSyncState->status; + // _startupComponent_inlock() is shutdown-aware. Additionally, if the component fails to + // startup, _startupComponent_inlock() resets the unique_ptr to the component (in this case, + // DatabasesCloner). + status = _startupComponent_inlock(_initialSyncState->dbsCloner); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; } - lk.unlock(); +} + +void DataReplicator::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus, + const OpTimeWithHash& lastFetched, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + log() << "Finished fetching oplog during initial sync: " << redact(oplogFetcherFinishStatus) + << ". Last fetched optime and hash: " << lastFetched; - // Check for roll back, and fail if so. - auto hasHadRollbackResponse = rollbackChecker.hasHadRollback(); - lk.lock(); - if (!hasHadRollbackResponse.isOK()) { - _initialSyncState->status = hasHadRollbackResponse.getStatus(); - } else if (hasHadRollbackResponse.getValue()) { - _initialSyncState->status = {ErrorCodes::UnrecoverableRollbackError, - "Rollback occurred during initial sync"}; + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + oplogFetcherFinishStatus, "error fetching oplog during initial sync"); + + // When the OplogFetcher completes early (instead of being canceled at shutdown), we log and let + // our reference to 'onCompletionGuard' go out of scope. Since we know the + // DatabasesCloner/MultiApplier will still have a reference to it, the actual function within + // the guard won't be fired yet. + // It is up to the DatabasesCloner and MultiApplier to determine if they can proceed without any + // additional data going into the oplog buffer. + // It is not common for the OplogFetcher to return with an OK status. The only time it returns + // an OK status is when the 'stopOplogFetcher' fail point is enabled, which causes the + // OplogFetcher to ignore the current sync source response and return early. + if (status.isOK()) { + log() << "Finished fetching oplog fetching early. Last fetched optime and hash: " + << lastFetched.toString(); + _lastFetched = lastFetched; + return; } - if (!_initialSyncState->status.isOK()) { - return _initialSyncState->status; + // During normal operation, this call to onCompletion->setResultAndCancelRemainingWork_inlock + // is a no-op because the other thread running the DatabasesCloner or MultiApplier will already + // have called it with the success/failed status. + // The OplogFetcher does not finish on its own because of the oplog tailing query it runs on the + // sync source. The most common OplogFetcher completion status is CallbackCanceled due to either + // a shutdown request or completion of the data cloning and oplog application phases. + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); +} + +void DataReplicator::_databasesClonerCallback( + const Status& databaseClonerFinishStatus, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock(databaseClonerFinishStatus, + "error cloning databases"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; } - // If no oplog entries were applied, then we need to store the document that we fetched before - // we began cloning. - if (_initialSyncState->appliedOps == 0) { - auto oplogSeedDoc = _initialSyncState->oplogSeedDoc; - lk.unlock(); + status = _scheduleLastOplogEntryFetcher_inlock( + stdx::bind(&DataReplicator::_lastOplogEntryFetcherCallbackForStopTimestamp, + this, + stdx::placeholders::_1, + onCompletionGuard)); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} + +void DataReplicator::_lastOplogEntryFetcherCallbackForStopTimestamp( + const StatusWith<Fetcher::QueryResponse>& result, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + result.getStatus(), "error fetching last oplog entry for stop timestamp"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + auto&& optimeWithHashStatus = parseOpTimeWithHash(result); + if (!optimeWithHashStatus.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, optimeWithHashStatus.getStatus()); + return; + } + auto&& optimeWithHash = optimeWithHashStatus.getValue(); + _initialSyncState->stopTimestamp = optimeWithHash.opTime.getTimestamp(); - LOG(1) << "inserting oplog seed document: " << _initialSyncState->oplogSeedDoc; + if (_initialSyncState->beginTimestamp == _initialSyncState->stopTimestamp) { + _lastApplied = optimeWithHash; + log() << "No need to apply operations. (currently at " + << _initialSyncState->stopTimestamp.toBSON() << ")"; + } else { + invariant(_lastApplied.opTime.isNull()); + _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard); + return; + } + } - // Store the first oplog entry, after initial sync completes. - const auto insertStatus = - _storage->insertDocuments(txn, _opts.localOplogNS, {oplogSeedDoc}); - lk.lock(); + // Oplog at sync source has not advanced since we started cloning databases, so we use the last + // oplog entry to seed the oplog before checking the rollback ID. + { + const auto& documents = result.getValue().documents; + invariant(!documents.empty()); + const auto& oplogSeedDoc = documents.front(); + LOG(1) << "inserting oplog seed document: " << oplogSeedDoc; - if (!insertStatus.isOK()) { - _initialSyncState->status = insertStatus; - return _initialSyncState->status; + auto txn = makeOpCtx(); + // StorageInterface::insertDocument() has to be called outside the lock because we may + // override its behavior in tests. See DataReplicatorReturnsCallbackCanceledAndDoesNot- + // ScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument in + // data_replicator_test.cpp + auto status = _storage->insertDocument(txn.get(), _opts.localOplogNS, oplogSeedDoc); + if (!status.isOK()) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; } } - return Status::OK(); // success + stdx::lock_guard<stdx::mutex> lock(_mutex); + // This sets the error in 'onCompletionGuard' and shuts down the OplogFetcher on error. + _scheduleRollbackCheckerCheckForRollback_inlock(lock, onCompletionGuard); } -StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn, - std::size_t maxAttempts) { - const Status shutdownStatus{ErrorCodes::ShutdownInProgress, - "Shutting down while in doInitialSync."}; - if (!txn) { - std::string msg = "Initial Sync attempted but no OperationContext*, so aborting."; - error() << msg; - return Status{ErrorCodes::InitialSyncFailure, msg}; - } - UniqueLock lk(_mutex); - if (_inShutdown || (_initialSyncState && !_initialSyncState->status.isOK())) { - const auto retStatus = (_initialSyncState && !_initialSyncState->status.isOK()) - ? _initialSyncState->status - : shutdownStatus; - return retStatus; +void DataReplicator::_getNextApplierBatchCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto status = + _checkForShutdownAndConvertStatus_inlock(callbackArgs, "error getting next applier batch"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; } - _stats.initialSyncStart = _exec->now(); - if (_dataReplicatorState == DataReplicatorState::InitialSync) { - return {ErrorCodes::InitialSyncActive, - (str::stream() << "Initial sync in progress; try resync to start anew.")}; + + auto batchResult = _getNextApplierBatch_inlock(); + if (!batchResult.isOK()) { + warning() << "Failure creating next apply batch: " << redact(batchResult.getStatus()); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, batchResult.getStatus()); + return; } - LOG(1) << "Creating oplogBuffer."; - _oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer(txn); - _oplogBuffer->startup(txn); - ON_BLOCK_EXIT([this, txn, &lk]() { - if (!lk.owns_lock()) { - lk.lock(); + // Schedule MultiApplier if we have operations to apply. + const auto& ops = batchResult.getValue(); + if (!ops.empty()) { + _fetchCount.store(0); + // "_syncSource" has to be copied to stdx::bind result. + HostAndPort source = _syncSource; + auto applyOperationsForEachReplicationWorkerThreadFn = + stdx::bind(&DataReplicatorExternalState::_multiInitialSyncApply, + _dataReplicatorExternalState.get(), + stdx::placeholders::_1, + source, + &_fetchCount); + auto applyBatchOfOperationsFn = stdx::bind(&DataReplicatorExternalState::_multiApply, + _dataReplicatorExternalState.get(), + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3); + + const auto lastEntry = ops.back().raw; + const auto opTimeWithHashStatus = parseOpTimeWithHash(lastEntry); + status = opTimeWithHashStatus.getStatus(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; } - invariant(_oplogBuffer); - _oplogBuffer->shutdown(txn); - }); - - lk.unlock(); - // This will call through to the storageInterfaceImpl to ReplicationCoordinatorImpl. - _storage->setInitialSyncFlag(txn); - lk.lock(); - _stats.maxFailedInitialSyncAttempts = maxAttempts; - _stats.failedInitialSyncAttempts = 0; - while (_stats.failedInitialSyncAttempts < _stats.maxFailedInitialSyncAttempts) { - if (_inShutdown) { - return shutdownStatus; + auto lastApplied = opTimeWithHashStatus.getValue(); + auto numApplied = ops.size(); + _applier = + stdx::make_unique<MultiApplier>(_exec, + ops, + applyOperationsForEachReplicationWorkerThreadFn, + applyBatchOfOperationsFn, + stdx::bind(&DataReplicator::_multiApplierCallback, + this, + stdx::placeholders::_1, + lastApplied, + numApplied, + onCompletionGuard)); + status = _startupComponent_inlock(_applier); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; } + return; + } - Status attemptErrorStatus(Status::OK()); - - ON_BLOCK_EXIT([this, txn, &lk, &attemptErrorStatus]() { - if (!lk.owns_lock()) { - lk.lock(); - } - if (_anyActiveHandles_inlock()) { - _cancelAllHandles_inlock(); - _waitOnAndResetAll_inlock(&lk); - if (!attemptErrorStatus.isOK()) { - _initialSyncState.reset(); - } - } - }); + // If the oplog fetcher is no longer running (completed successfully) and the oplog buffer is + // empty, we are not going to make any more progress with this initial sync. Report progress so + // far and return a RemoteResultsUnavailable error. + if (!_oplogFetcher->isActive()) { + std::string msg = str::stream() + << "The oplog fetcher is no longer running and we have applied all the oplog entries " + "in the oplog buffer. Aborting this initial sync attempt. Last applied: " + << _lastApplied.toString() << ". Last fetched: " << _lastFetched.toString() + << ". Number of operations applied: " << _initialSyncState->appliedOps; + log() << msg; + status = Status(ErrorCodes::RemoteResultsUnavailable, msg); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } - _setState_inlock(DataReplicatorState::InitialSync); - _applierPaused = true; + // If there are no operations at the moment to apply and the oplog fetcher is still waiting on + // the sync source, we'll check the oplog buffer again in + // '_opts.getApplierBatchCallbackRetryWait' ms. + auto when = _exec->now() + _opts.getApplierBatchCallbackRetryWait; + status = _scheduleWorkAtAndSaveHandle_inlock( + when, + stdx::bind(&DataReplicator::_getNextApplierBatchCallback, + this, + stdx::placeholders::_1, + onCompletionGuard), + &_getNextApplierBatchHandle, + "_getNextApplierBatchCallback"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} - LOG(2) << "Resetting sync source so a new one can be chosen for this initial sync attempt."; - _syncSource = HostAndPort(); +void DataReplicator::_multiApplierCallback(const Status& multiApplierStatus, + OpTimeWithHash lastApplied, + std::uint32_t numApplied, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto status = + _checkForShutdownAndConvertStatus_inlock(multiApplierStatus, "error applying batch"); + if (!status.isOK()) { + error() << "Failed to apply batch due to '" << redact(status) << "'"; + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } - _resetState_inlock(txn, OpTimeWithHash()); + _initialSyncState->appliedOps += numApplied; + _lastApplied = lastApplied; + _opts.setMyLastOptime(_lastApplied.opTime); - // For testing, we may want to fail if we receive a getmore. - if (MONGO_FAIL_POINT(failInitialSyncWithBadHost)) { - attemptErrorStatus = - Status(ErrorCodes::InvalidSyncSource, - "no sync source avail(failInitialSyncWithBadHost failpoint is set)."); + auto fetchCount = _fetchCount.load(); + if (fetchCount > 0) { + _initialSyncState->fetchedMissingDocs += fetchCount; + _fetchCount.store(0); + status = _scheduleLastOplogEntryFetcher_inlock( + stdx::bind(&DataReplicator::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments, + this, + stdx::placeholders::_1, + onCompletionGuard)); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; } + return; + } - if (attemptErrorStatus.isOK()) { - invariant(_syncSource.empty()); - for (int i = 0; i < numInitialSyncConnectAttempts; ++i) { - auto syncSource = _chooseSyncSource_inlock(); - if (syncSource.isOK()) { - _syncSource = syncSource.getValue(); - break; - } - attemptErrorStatus = syncSource.getStatus(); - LOG(1) << "Error getting sync source: '" << attemptErrorStatus.toString() - << "', trying again in " << _opts.syncSourceRetryWait << ". Attempt " - << i + 1 << " of " << numInitialSyncConnectAttempts.load(); - sleepmillis(durationCount<Milliseconds>(_opts.syncSourceRetryWait)); - } + _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard); +} - if (_syncSource.empty()) { - attemptErrorStatus = Status( - ErrorCodes::InitialSyncOplogSourceMissing, - "No valid sync source found in current replica set to do an initial sync."); - } else { - attemptErrorStatus = _runInitialSyncAttempt_inlock(txn, lk, _syncSource); - LOG(1) << "initial sync attempt returned with status: " << attemptErrorStatus; - } - } +void DataReplicator::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( + const StatusWith<Fetcher::QueryResponse>& result, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + result.getStatus(), "error getting last oplog entry after fetching missing documents"); + if (!status.isOK()) { + error() << "Failed to get new minValid from source " << _syncSource << " due to '" + << redact(status) << "'"; + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } - auto runTime = _initialSyncState ? _initialSyncState->timer.millis() : 0; - _stats.initialSyncAttemptInfos.emplace_back( - DataReplicator::InitialSyncAttemptInfo{runTime, attemptErrorStatus, _syncSource}); - - // If the status is ok now then initial sync is over. We must do this before we reset - // _initialSyncState and lose the DatabasesCloner's stats. - if (attemptErrorStatus.isOK()) { - _stats.initialSyncEnd = _exec->now(); - log() << "Initial Sync Statistics: " << _getInitialSyncProgress_inlock(); - if (MONGO_FAIL_POINT(initialSyncHangBeforeFinish)) { - lk.unlock(); - // This log output is used in js tests so please leave it. - log() << "initial sync - initialSyncHangBeforeFinish fail point " - "enabled. Blocking until fail point is disabled."; - while (MONGO_FAIL_POINT(initialSyncHangBeforeFinish)) { - lk.lock(); - if (!_initialSyncState->status.isOK()) { - lk.unlock(); - break; - } - lk.unlock(); - mongo::sleepsecs(1); - } - lk.lock(); - } - } - if (_inShutdown) { - const auto retStatus = (_initialSyncState && !_initialSyncState->status.isOK()) - ? _initialSyncState->status - : shutdownStatus; - error() << "Initial sync attempt terminated due to shutdown: " << shutdownStatus; - return retStatus; - } + auto&& optimeWithHashStatus = parseOpTimeWithHash(result); + if (!optimeWithHashStatus.isOK()) { + error() << "Failed to parse new minValid from source " << _syncSource << " due to '" + << redact(optimeWithHashStatus.getStatus()) << "'"; + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, + optimeWithHashStatus.getStatus()); + return; + } + auto&& optimeWithHash = optimeWithHashStatus.getValue(); - // Cleanup - _cancelAllHandles_inlock(); - _waitOnAndResetAll_inlock(&lk); - invariant(!_anyActiveHandles_inlock()); + const auto newOplogEnd = optimeWithHash.opTime.getTimestamp(); + LOG(1) << "Pushing back minValid from " << _initialSyncState->stopTimestamp << " to " + << newOplogEnd; + _initialSyncState->stopTimestamp = newOplogEnd; - if (attemptErrorStatus.isOK()) { - break; - } + // Get another batch to apply. + _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard); +} - ++_stats.failedInitialSyncAttempts; - initialSyncFailedAttempts.increment(); - - error() << "Initial sync attempt failed -- attempts left: " - << (_stats.maxFailedInitialSyncAttempts - _stats.failedInitialSyncAttempts) - << " cause: " << attemptErrorStatus; - - // Check if need to do more retries. - if (_stats.failedInitialSyncAttempts >= _stats.maxFailedInitialSyncAttempts) { - const std::string err = - "The maximum number of retries" - " have been exhausted for initial sync."; - severe() << err; - - initialSyncFailures.increment(); - _setState_inlock(DataReplicatorState::Uninitialized); - _stats.initialSyncEnd = _exec->now(); - log() << "Initial Sync Statistics: " << _getInitialSyncProgress_inlock(); - return attemptErrorStatus; - } +void DataReplicator::_rollbackCheckerCheckForRollbackCallback( + const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(), + "error while getting last rollback ID"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } - // Sleep for retry time - lk.unlock(); - sleepmillis(durationCount<Milliseconds>(_opts.initialSyncRetryWait)); - lk.lock(); + auto hasHadRollback = result.getValue(); + if (hasHadRollback) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, + Status(ErrorCodes::UnrecoverableRollbackError, + str::stream() << "Rollback occurred on our sync source " << _syncSource + << " during initial sync")); + return; } - _applierPaused = false; + // Success! + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, _lastApplied); +} - _lastFetched = _lastApplied; +void DataReplicator::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& lastApplied) { + // Since _finishInitialSyncAttempt can be called from any component's callback function or + // scheduled task, it is possible that we may not be in a TaskExecutor-managed thread when this + // function is invoked. + // For example, if CollectionCloner fails while inserting documents into the + // CollectionBulkLoader, we will get here via one of CollectionCloner's TaskRunner callbacks + // which has an active OperationContext bound to the current Client. This would lead to an + // invariant when we attempt to create a new OperationContext for _tearDown(txn). + // To avoid this, we schedule _finishCallback against the TaskExecutor rather than calling it + // here synchronously. + + // Unless dismissed, a scope guard will schedule _finishCallback() upon exiting this function. + // Since it is a requirement that _finishCallback be called outside the lock (which is possible + // if the task scheduling fails and we have to invoke _finishCallback() synchronously), we + // declare the scope guard before the lock guard. + auto result = lastApplied; + auto finishCallbackGuard = MakeGuard([this, &result] { + auto scheduleResult = + _exec->scheduleWork(stdx::bind(&DataReplicator::_finishCallback, this, result)); + if (!scheduleResult.isOK()) { + warning() << "Unable to schedule data replicator completion task due to " + << redact(scheduleResult.getStatus()) + << ". Running callback on current thread."; + _finishCallback(result); + } + }); - _storage->clearInitialSyncFlag(txn); - _opts.setMyLastOptime(_lastApplied.opTime); - log() << "initial sync done; took " - << duration_cast<Seconds>(_stats.initialSyncEnd - _stats.initialSyncStart) << "."; - initialSyncCompletes.increment(); - return _lastApplied; -} + log() << "Initial sync attempt finishing up."; + + stdx::lock_guard<stdx::mutex> lock(_mutex); + log() << "Initial Sync Attempt Statistics: " << redact(_getInitialSyncProgress_inlock()); -void DataReplicator::_onDataClonerFinish(const Status& status, HostAndPort syncSource) { - log() << "data clone finished, status: " << redact(status); + auto runTime = _initialSyncState ? _initialSyncState->timer.millis() : 0; + _stats.initialSyncAttemptInfos.emplace_back( + DataReplicator::InitialSyncAttemptInfo{runTime, result.getStatus(), _syncSource}); - if (status.code() == ErrorCodes::CallbackCanceled) { + if (result.isOK()) { + // Scope guard will invoke _finishCallback(). return; } - LockGuard lk(_mutex); - if (_inShutdown) { - // Signal shutdown event. - _doNextActions_inlock(); + // This increments the number of failed attempts for the current initial sync request. + ++_stats.failedInitialSyncAttempts; + + // This increments the number of failed attempts across all initial sync attempts since process + // startup. + initialSyncFailedAttempts.increment(); + + error() << "Initial sync attempt failed -- attempts left: " + << (_stats.maxFailedInitialSyncAttempts - _stats.failedInitialSyncAttempts) + << " cause: " << redact(result.getStatus()); + + // Check if need to do more retries. + if (_stats.failedInitialSyncAttempts >= _stats.maxFailedInitialSyncAttempts) { + const std::string err = + "The maximum number of retries have been exhausted for initial sync."; + severe() << err; + + initialSyncFailures.increment(); + + // Scope guard will invoke _finishCallback(). return; } + auto when = _exec->now() + _opts.initialSyncRetryWait; + auto status = _scheduleWorkAtAndSaveHandle_inlock( + when, + stdx::bind(&DataReplicator::_startInitialSyncAttemptCallback, + this, + stdx::placeholders::_1, + _stats.failedInitialSyncAttempts, + _stats.maxFailedInitialSyncAttempts), + &_startInitialSyncAttemptHandle, + str::stream() << "_startInitialSyncAttemptCallback-" << _stats.failedInitialSyncAttempts); + if (!status.isOK()) { - // Initial sync failed during cloning of databases - error() << "Failed to clone data due to '" << redact(status) << "'"; - invariant(_initialSyncState); - _initialSyncState->status = status; - _exec->signalEvent(_initialSyncState->finishEvent); + result = status; + + // Scope guard will invoke _finishCallback(). return; } - _scheduleLastOplogEntryFetcher_inlock( - stdx::bind(&DataReplicator::_onApplierReadyStart, this, stdx::placeholders::_1)); + // Next initial sync attempt scheduled successfully and we do not need to call _finishCallback() + // until the next initial sync attempt finishes. + finishCallbackGuard.Dismiss(); +} + +void DataReplicator::_finishCallback(StatusWith<OpTimeWithHash> lastApplied) { + // After running callback function, clear '_onCompletion' to release any resources that might be + // held by this function object. + // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case + // there is any logic that's invoked at the function object's destruction that might call into + // this DataReplicator. 'onCompletion' must be destroyed outside the lock and this should happen + // before we transition the state to Complete. + decltype(_onCompletion) onCompletion; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto txn = makeOpCtx(); + _tearDown_inlock(txn.get(), lastApplied); + + invariant(_onCompletion); + std::swap(_onCompletion, onCompletion); + } + + if (MONGO_FAIL_POINT(initialSyncHangBeforeFinish)) { + // This log output is used in js tests so please leave it. + log() << "initial sync - initialSyncHangBeforeFinish fail point " + "enabled. Blocking until fail point is disabled."; + while (MONGO_FAIL_POINT(initialSyncHangBeforeFinish) && !_isShuttingDown()) { + mongo::sleepsecs(1); + } + } + + // Completion callback must be invoked outside mutex. + try { + onCompletion(lastApplied); + } catch (...) { + warning() << "data replicator finish callback threw exception: " + << redact(exceptionToStatus()); + } + + // Destroy the remaining reference to the completion callback before we transition the state to + // Complete so that callers can expect any resources bound to '_onCompletion' to be released + // before DataReplicator::join() returns. + onCompletion = {}; + + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state != State::kComplete); + _state = State::kComplete; + _stateCondition.notify_all(); } -void DataReplicator::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback) { +Status DataReplicator::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback) { BSONObj query = BSON( "find" << _opts.remoteOplogNS.coll() << "sort" << BSON("$natural" << -1) << "limit" << 1); @@ -801,199 +1192,197 @@ void DataReplicator::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn c RemoteCommandRetryScheduler::kAllRetriableErrors)); Status scheduleStatus = _lastOplogEntryFetcher->schedule(); if (!scheduleStatus.isOK()) { - _initialSyncState->status = scheduleStatus; - _exec->signalEvent(_initialSyncState->finishEvent); + _lastOplogEntryFetcher.reset(); } + + return scheduleStatus; } -void DataReplicator::_onApplierReadyStart(const QueryResponseStatus& fetchResult) { - if (ErrorCodes::CallbackCanceled == fetchResult.getStatus()) { +void DataReplicator::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( + const std::lock_guard<std::mutex>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + // We should check our current state because shutdown() could have been called before + // we re-acquired the lock. + if (_isShuttingDown_inlock()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, + Status(ErrorCodes::CallbackCanceled, + "failed to schedule applier to check for " + "rollback: data replicator is shutting down")); return; } - // Data clone done, move onto apply. - LockGuard lk(_mutex); - if (_inShutdown) { - // Signal shutdown event. - _doNextActions_inlock(); + // Basic sanity check on begin/stop timestamps. + if (_initialSyncState->beginTimestamp > _initialSyncState->stopTimestamp) { + std::string msg = str::stream() + << "Possible rollback on sync source " << _syncSource.toString() << ". Currently at " + << _initialSyncState->stopTimestamp.toBSON() << ". Started at " + << _initialSyncState->beginTimestamp.toBSON(); + error() << msg; + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, Status(ErrorCodes::OplogOutOfOrder, msg)); return; } - auto&& optimeWithHashStatus = parseOpTimeWithHash(fetchResult); - if (optimeWithHashStatus.isOK()) { - auto&& optimeWithHash = optimeWithHashStatus.getValue(); - _initialSyncState->stopTimestamp = optimeWithHash.opTime.getTimestamp(); - - // Check if applied to/past our stopTimestamp. - if (_initialSyncState->beginTimestamp < _initialSyncState->stopTimestamp) { - invariant(_applierPaused); - log() << "Applying operations until " << _initialSyncState->stopTimestamp.toBSON() - << " before initial sync can complete. (starting at " - << _initialSyncState->beginTimestamp.toBSON() << ")"; - _applierPaused = false; - } else { - log() << "No need to apply operations. (currently at " - << _initialSyncState->stopTimestamp.toBSON() << ")"; - if (_lastApplied.opTime.getTimestamp() < _initialSyncState->stopTimestamp) { - _lastApplied = optimeWithHash; - } - } - } else { - _initialSyncState->status = optimeWithHashStatus.getStatus(); + if (_lastApplied.opTime.isNull()) { + // Check if any ops occurred while cloning. + invariant(_initialSyncState->beginTimestamp < _initialSyncState->stopTimestamp); + log() << "Applying operations until " << _initialSyncState->stopTimestamp.toBSON() + << " before initial sync can complete. (starting at " + << _initialSyncState->beginTimestamp.toBSON() << ")"; + // Fall through to scheduling _getNextApplierBatchCallback(). + } else if (_lastApplied.opTime.getTimestamp() >= _initialSyncState->stopTimestamp) { + // Check for rollback if we have applied far enough to be consistent. + invariant(!_lastApplied.opTime.getTimestamp().isNull()); + _scheduleRollbackCheckerCheckForRollback_inlock(lock, onCompletionGuard); + return; } - // Ensure that the DatabasesCloner has reached an inactive state because this callback is - // scheduled by the DatabasesCloner callback. This will avoid a race in _doNextActions() where - // we mistakenly think the cloner is still active. - if (_initialSyncState->dbsCloner) { - _initialSyncState->dbsCloner->join(); + // Get another batch to apply. + // _scheduleWorkAndSaveHandle_inlock() is shutdown-aware. + auto status = + _scheduleWorkAndSaveHandle_inlock(stdx::bind(&DataReplicator::_getNextApplierBatchCallback, + this, + stdx::placeholders::_1, + onCompletionGuard), + &_getNextApplierBatchHandle, + "_getNextApplierBatchCallback"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; } - - _doNextActions_inlock(); } -bool DataReplicator::_anyActiveHandles_inlock() const { - // If any component is active then retVal will be set to true. - bool retVal = false; +void DataReplicator::_scheduleRollbackCheckerCheckForRollback_inlock( + const std::lock_guard<std::mutex>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + // We should check our current state because shutdown() could have been called before + // we re-acquired the lock. + if (_isShuttingDown_inlock()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, + Status(ErrorCodes::CallbackCanceled, + "failed to schedule rollback checker to check " + "for rollback: data replicator is shutting " + "down")); + return; + } - // For diagnostic reasons, do not return early once an active component is found, but instead - // log each active component. + auto scheduleResult = _rollbackChecker->checkForRollback( + stdx::bind(&DataReplicator::_rollbackCheckerCheckForRollbackCallback, + this, + stdx::placeholders::_1, + onCompletionGuard)); - if (_oplogFetcher && _oplogFetcher->isActive()) { - LOG(0 /*1*/) << "_oplogFetcher is active (_anyActiveHandles_inlock): " - << _oplogFetcher->toString(); - retVal = true; + auto status = scheduleResult.getStatus(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; } - if (_initialSyncState && _initialSyncState->dbsCloner && - _initialSyncState->dbsCloner->isActive()) { - LOG(0 /*1*/) << "_initialSyncState::dbsCloner is active (_anyActiveHandles_inlock): " - << _initialSyncState->dbsCloner->toString(); - retVal = true; - } + _getLastRollbackIdHandle = scheduleResult.getValue(); + return; +} - if (_applier && _applier->isActive()) { - LOG(0 /*1*/) << "_applier is active (_anyActiveHandles_inlock): " << _applier->toString(); - retVal = true; - } +Status DataReplicator::_checkForShutdownAndConvertStatus_inlock( + const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message) { + return _checkForShutdownAndConvertStatus_inlock(callbackArgs.status, message); +} - if (_shuttingDownApplier && _shuttingDownApplier->isActive()) { - LOG(0 /*1*/) << "_shuttingDownApplier is active (_anyActiveHandles_inlock): " - << _shuttingDownApplier->toString(); - retVal = true; - } +Status DataReplicator::_checkForShutdownAndConvertStatus_inlock(const Status& status, + const std::string& message) { - if (_lastOplogEntryFetcher && _lastOplogEntryFetcher->isActive()) { - LOG(0 /*1*/) << "_lastOplogEntryFetcher is active (_anyActiveHandles_inlock): " - << _lastOplogEntryFetcher->toString(); - retVal = true; + if (_isShuttingDown_inlock()) { + return Status(ErrorCodes::CallbackCanceled, message + ": data replicator is shutting down"); } - if (!retVal) { - LOG(0 /*2*/) - << "DataReplicator::_anyActiveHandles_inlock returned false as nothing is active."; + if (!status.isOK()) { + return Status(status.code(), message + ": " + status.reason()); } - return retVal; -} -void DataReplicator::_cancelAllHandles_inlock() { - if (_oplogFetcher) - _oplogFetcher->shutdown(); - if (_lastOplogEntryFetcher) { - _lastOplogEntryFetcher->shutdown(); - } - if (_applier) - _applier->shutdown(); - // No need to call shutdown() on _shuttingdownApplier. This applier is assigned when the most - // recent applier's finish callback has been invoked. Note that isActive() will still return - // true if the callback is still in progress. - if (_initialSyncState && _initialSyncState->dbsCloner && - _initialSyncState->dbsCloner->isActive()) { - _initialSyncState->dbsCloner->shutdown(); - } + return Status::OK(); } -void DataReplicator::_waitOnAndResetAll_inlock(UniqueLock* lk) { - swapAndJoin_inlock(lk, _oplogFetcher, "Waiting on oplog fetcher: "); - swapAndJoin_inlock(lk, _applier, "Waiting on applier: "); - swapAndJoin_inlock(lk, _shuttingDownApplier, "Waiting on most recently completed applier: "); - if (_initialSyncState) { - swapAndJoin_inlock(lk, _initialSyncState->dbsCloner, "Waiting on databases cloner: "); - } - // A new _lastOplogEntryFetcher may be scheduled on completion of the DatabasesCloner and - // MultiApplier so we wait on the fetcher after the DatabasesCloner and MultiApplier are - // destroyed. - swapAndJoin_inlock(lk, _lastOplogEntryFetcher, "Waiting on fetcher (last oplog entry): "); +Status DataReplicator::_scheduleWorkAndSaveHandle_inlock( + const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name) { + invariant(handle); + if (_isShuttingDown_inlock()) { + return Status(ErrorCodes::CallbackCanceled, + str::stream() << "failed to schedule work " << name + << ": data replicator is shutting down"); + } + auto result = _exec->scheduleWork(work); + if (!result.isOK()) { + return Status(result.getStatus().code(), + str::stream() << "failed to schedule work " << name << ": " + << result.getStatus().reason()); + } + *handle = result.getValue(); + return Status::OK(); } -void DataReplicator::_doNextActions() { - LockGuard lk(_mutex); - _doNextActions_inlock(); +Status DataReplicator::_scheduleWorkAtAndSaveHandle_inlock( + Date_t when, + const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name) { + invariant(handle); + if (_isShuttingDown_inlock()) { + return Status(ErrorCodes::CallbackCanceled, + str::stream() << "failed to schedule work " << name << " at " + << when.toString() + << ": data replicator is shutting down"); + } + auto result = _exec->scheduleWorkAt(when, work); + if (!result.isOK()) { + return Status( + result.getStatus().code(), + str::stream() << "failed to schedule work " << name << " at " << when.toString() << ": " + << result.getStatus().reason()); + } + *handle = result.getValue(); + return Status::OK(); } -void DataReplicator::_doNextActions_inlock() { - // Can be in one of 2 main states/modes (DataReplicatorState): - // 1.) Initial Sync - // 2.) Uninitialized - - // Check for shutdown flag, signal event - if (_onShutdown.isValid()) { - if (!_onShutdownSignaled) { - _exec->signalEvent(_onShutdown); - _setState_inlock(DataReplicatorState::Uninitialized); - _onShutdownSignaled = true; - } +void DataReplicator::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle) { + if (!handle) { return; } + _exec->cancel(handle); +} - if (DataReplicatorState::Uninitialized == _dataReplicatorState) { - return; +template <typename Component> +Status DataReplicator::_startupComponent_inlock(Component& component) { + if (_isShuttingDown_inlock()) { + return Status(ErrorCodes::CallbackCanceled, + "data replicator shutdown while trying to call startup() on component"); } - - invariant(_initialSyncState); - - if (!_initialSyncState->status.isOK()) { - return; - } - - if (_initialSyncState->dbsCloner) { - if (_initialSyncState->dbsCloner->isActive() || - !_initialSyncState->dbsCloner->getStatus().isOK()) { - return; - } + auto status = component->startup(); + if (!status.isOK()) { + component.reset(); } + return status; +} - // The DatabasesCloner has completed so make sure we apply far enough to be consistent. - const auto lastAppliedTS = _lastApplied.opTime.getTimestamp(); - if (!lastAppliedTS.isNull() && lastAppliedTS >= _initialSyncState->stopTimestamp) { - invariant(_initialSyncState->finishEvent.isValid()); - invariant(_initialSyncState->status.isOK()); - _setState_inlock(DataReplicatorState::Uninitialized); - _exec->signalEvent(_initialSyncState->finishEvent); +template <typename Component> +void DataReplicator::_shutdownComponent_inlock(Component& component) { + if (!component) { return; } - - // Check if no active apply and ops to apply - if (!_applier || !_applier->isActive()) { - if (_oplogBuffer && _oplogBuffer->getSize() > 0) { - const auto scheduleStatus = _scheduleApplyBatch_inlock(); - if (!scheduleStatus.isOK()) { - if (scheduleStatus != ErrorCodes::ShutdownInProgress) { - error() << "Error scheduling apply batch '" << scheduleStatus << "'."; - _applier.reset(); - _scheduleDoNextActions(); - } - } - } else { - LOG(3) << "Cannot apply a batch since we have nothing buffered."; - } - } + component->shutdown(); } StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() { + // If the fail-point is active, delay the apply batch by returning an empty batch so that + // _getNextApplierBatchCallback() will reschedule itself at a later time. + // See DataReplicatorOptions::getApplierBatchCallbackRetryWait. + if (MONGO_FAIL_POINT(rsSyncApplyStop)) { + return Operations(); + } + const int slaveDelaySecs = durationCount<Seconds>(_opts.getSlaveDelay()); - size_t totalBytes = 0; + std::uint32_t totalBytes = 0; Operations ops; BSONObj op; @@ -1008,6 +1397,15 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() { while (_oplogBuffer->peek(txn.get(), &op)) { auto entry = OplogEntry(std::move(op)); + // Check for oplog version change. If it is absent, its value is one. + if (entry.getVersion() != OplogEntry::kOplogVersion) { + std::string message = str::stream() + << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " + << entry.getVersion() << " in oplog entry: " << redact(entry.raw); + severe() << message; + return {ErrorCodes::BadValue, message}; + } + // Check for ops that must be processed one at a time. if (entry.isCommand() || // Index builds are achieved through the use of an insert op, not a command op. @@ -1025,15 +1423,6 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() { return std::move(ops); } - // Check for oplog version change. If it is absent, its value is one. - if (entry.getVersion() != OplogEntry::kOplogVersion) { - std::string message = str::stream() - << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " - << entry.getVersion() << " in oplog entry: " << redact(entry.raw); - severe() << message; - return {ErrorCodes::BadValue, message}; - } - // Apply replication batch limits. if (ops.size() >= _opts.replBatchLimitOperations) { return std::move(ops); @@ -1066,166 +1455,6 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() { return std::move(ops); } -void DataReplicator::_onApplyBatchFinish(const Status& status, - OpTimeWithHash lastApplied, - std::size_t numApplied) { - if (ErrorCodes::CallbackCanceled == status) { - return; - } - - UniqueLock lk(_mutex); - - if (_inShutdown) { - // Signal shutdown event. - _doNextActions_inlock(); - return; - } - - // This might block in _shuttingDownApplier's destructor if it is still active here. - _shuttingDownApplier = std::move(_applier); - - if (!status.isOK()) { - invariant(DataReplicatorState::InitialSync == _dataReplicatorState); - error() << "Failed to apply batch due to '" << redact(status) << "'"; - _initialSyncState->status = status; - _exec->signalEvent(_initialSyncState->finishEvent); - return; - } - - auto fetchCount = _fetchCount.load(); - if (fetchCount > 0) { - _initialSyncState->fetchedMissingDocs += fetchCount; - _fetchCount.store(0); - _onFetchMissingDocument_inlock(lastApplied, numApplied); - // TODO (SERVER-25662): Remove this line. - _applierPaused = true; - return; - } - // TODO (SERVER-25662): Remove this line. - _applierPaused = false; - - - if (_initialSyncState) { - _initialSyncState->appliedOps += numApplied; - } - - _lastApplied = lastApplied; - lk.unlock(); - - _opts.setMyLastOptime(_lastApplied.opTime); - - _doNextActions(); -} - -void DataReplicator::_onFetchMissingDocument_inlock(OpTimeWithHash lastApplied, - std::size_t numApplied) { - _scheduleLastOplogEntryFetcher_inlock([this, lastApplied, numApplied]( - const QueryResponseStatus& fetchResult, Fetcher::NextAction*, BSONObjBuilder*) { - auto&& lastOplogEntryOpTimeWithHashStatus = parseOpTimeWithHash(fetchResult); - - if (!lastOplogEntryOpTimeWithHashStatus.isOK()) { - { - LockGuard lk(_mutex); - error() << "Failed to get new minValid from source " << _syncSource << " due to '" - << redact(lastOplogEntryOpTimeWithHashStatus.getStatus()) << "'"; - _initialSyncState->status = lastOplogEntryOpTimeWithHashStatus.getStatus(); - } - _exec->signalEvent(_initialSyncState->finishEvent); - return; - } - - const auto newOplogEnd = - lastOplogEntryOpTimeWithHashStatus.getValue().opTime.getTimestamp(); - { - LockGuard lk(_mutex); - LOG(1) << "Pushing back minValid from " << _initialSyncState->stopTimestamp << " to " - << newOplogEnd; - _initialSyncState->stopTimestamp = newOplogEnd; - } - _onApplyBatchFinish(Status::OK(), lastApplied, numApplied); - }); -} - -Status DataReplicator::_scheduleDoNextActions() { - auto status = _exec->scheduleWork([this](const CallbackArgs& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - _doNextActions(); - }); - return status.getStatus(); -} - -Status DataReplicator::_scheduleApplyBatch_inlock() { - if (_applierPaused) { - return Status::OK(); - } - - if (_applier && _applier->isActive()) { - return Status::OK(); - } - - // If the fail-point is active, delay the apply batch. - if (MONGO_FAIL_POINT(rsSyncApplyStop)) { - auto status = _exec->scheduleWorkAt(_exec->now() + Milliseconds(10), - [this](const CallbackArgs& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - _doNextActions(); - }); - return status.getStatus(); - } - - auto batchStatus = _getNextApplierBatch_inlock(); - if (!batchStatus.isOK()) { - warning() << "Failure creating next apply batch: " << redact(batchStatus.getStatus()); - return batchStatus.getStatus(); - } - const Operations& ops = batchStatus.getValue(); - if (ops.empty()) { - return _scheduleDoNextActions(); - } - - invariant(_dataReplicatorState == DataReplicatorState::InitialSync); - _fetchCount.store(0); - // "_syncSource" has to be copied to stdx::bind result. - HostAndPort source = _syncSource; - auto applierFn = stdx::bind(&DataReplicatorExternalState::_multiInitialSyncApply, - _dataReplicatorExternalState.get(), - stdx::placeholders::_1, - source, - &_fetchCount); - auto multiApplyFn = stdx::bind(&DataReplicatorExternalState::_multiApply, - _dataReplicatorExternalState.get(), - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3); - - const auto lastEntry = ops.back().raw; - const auto opTimeWithHashStatus = parseOpTimeWithHash(lastEntry); - auto lastApplied = uassertStatusOK(opTimeWithHashStatus); - auto numApplied = ops.size(); - auto lambda = stdx::bind(&DataReplicator::_onApplyBatchFinish, - this, - stdx::placeholders::_1, - lastApplied, - numApplied); - - invariant(!(_applier && _applier->isActive())); - _applier = stdx::make_unique<MultiApplier>(_exec, ops, applierFn, multiApplyFn, lambda); - return _applier->startup(); -} - -void DataReplicator::_setState(const DataReplicatorState& newState) { - LockGuard lk(_mutex); - _setState_inlock(newState); -} - -void DataReplicator::_setState_inlock(const DataReplicatorState& newState) { - _dataReplicatorState = newState; -} - StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() { auto syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime); if (syncSource.empty()) { @@ -1236,40 +1465,6 @@ StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() { return syncSource; } -Status DataReplicator::scheduleShutdown() { - auto eventStatus = _exec->makeEvent(); - if (!eventStatus.isOK()) { - return eventStatus.getStatus(); - } - - { - LockGuard lk(_mutex); - invariant(!_onShutdown.isValid()); - _inShutdown = true; - _onShutdown = eventStatus.getValue(); - if (DataReplicatorState::InitialSync == _dataReplicatorState && _initialSyncState && - _initialSyncState->status.isOK()) { - _initialSyncState->status = {ErrorCodes::ShutdownInProgress, - "Shutdown issued for the operation."}; - _exec->signalEvent(_initialSyncState->finishEvent); - } - _cancelAllHandles_inlock(); - } - - // Schedule _doNextActions in case nothing is active to trigger the _onShutdown event. - return _scheduleDoNextActions(); -} - -void DataReplicator::waitForShutdown() { - Event onShutdown; - { - LockGuard lk(_mutex); - invariant(_onShutdown.isValid()); - onShutdown = _onShutdown; - } - _exec->waitForEvent(onShutdown); -} - void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, const OplogFetcher::DocumentsInfo& info) { @@ -1277,12 +1472,10 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, return; } - { - LockGuard lk{_mutex}; - if (_inShutdown) { - return; - } + if (_isShuttingDown()) { + return; } + invariant(_oplogBuffer); // Wait for enough space. @@ -1299,38 +1492,46 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, _lastFetched = info.lastDocument; // TODO: updates metrics with "info". - - _doNextActions(); } -void DataReplicator::_onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched) { - log() << "Finished fetching oplog during initial sync: " << redact(status) - << ". Last fetched optime and hash: " << lastFetched; +DataReplicator::OnCompletionGuard::OnCompletionGuard( + const CancelRemainingWorkInLockFn& cancelRemainingWorkInLock, + const OnCompletionFn& onCompletion) + : _cancelRemainingWorkInLock(cancelRemainingWorkInLock), _onCompletion(onCompletion) {} - if (status.code() == ErrorCodes::CallbackCanceled) { - return; - } +DataReplicator::OnCompletionGuard::~OnCompletionGuard() { + MONGO_DESTRUCTOR_GUARD({ + if (!_lastAppliedSet) { + severe() << "It is a programming error to destroy this initial sync attempt completion " + "guard without the caller providing a result for '_lastApplied'"; + } + invariant(_lastAppliedSet); + // _onCompletion() must be called outside the DataReplicator's lock to avoid a deadlock. + _onCompletion(_lastApplied); + }); +} - LockGuard lk(_mutex); - if (_inShutdown) { - // Signal shutdown event. - _doNextActions_inlock(); - return; - } +void DataReplicator::OnCompletionGuard::setResultAndCancelRemainingWork_inlock( + const std::lock_guard<std::mutex>&, const StatusWith<OpTimeWithHash>& lastApplied) { + _setResultAndCancelRemainingWork_inlock(lastApplied); +} - if (!status.isOK()) { - invariant(_dataReplicatorState == DataReplicatorState::InitialSync); - // Do not change sync source, just log. - error() << "Error fetching oplog during initial sync: " << redact(status); - invariant(_initialSyncState); - _initialSyncState->status = status; - _exec->signalEvent(_initialSyncState->finishEvent); +void DataReplicator::OnCompletionGuard::setResultAndCancelRemainingWork_inlock( + const std::unique_lock<std::mutex>& lock, const StatusWith<OpTimeWithHash>& lastApplied) { + invariant(lock.owns_lock()); + _setResultAndCancelRemainingWork_inlock(lastApplied); +} + +void DataReplicator::OnCompletionGuard::_setResultAndCancelRemainingWork_inlock( + const StatusWith<OpTimeWithHash>& lastApplied) { + if (_lastAppliedSet) { return; } + _lastApplied = lastApplied; + _lastAppliedSet = true; - _lastFetched = lastFetched; - - _doNextActions_inlock(); + // It is fine to call this multiple times. + _cancelRemainingWorkInLock(); } std::string DataReplicator::Stats::toString() const { @@ -1344,8 +1545,10 @@ BSONObj DataReplicator::Stats::toBSON() const { } void DataReplicator::Stats::append(BSONObjBuilder* builder) const { - builder->appendNumber("failedInitialSyncAttempts", failedInitialSyncAttempts); - builder->appendNumber("maxFailedInitialSyncAttempts", maxFailedInitialSyncAttempts); + builder->appendNumber("failedInitialSyncAttempts", + static_cast<long long>(failedInitialSyncAttempts)); + builder->appendNumber("maxFailedInitialSyncAttempts", + static_cast<long long>(maxFailedInitialSyncAttempts)); if (initialSyncStart != Date_t()) { builder->appendDate("initialSyncStart", initialSyncStart); if (initialSyncEnd != Date_t()) { diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index 6316db8e3c1..ae2a541030a 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -29,6 +29,8 @@ #pragma once +#include <cstdint> +#include <iosfwd> #include <memory> #include "mongo/base/status.h" @@ -41,21 +43,19 @@ #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/rollback_checker.h" #include "mongo/db/repl/sync_source_selector.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" -#include "mongo/stdx/thread.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/net/hostandport.h" namespace mongo { - -class QueryFetcher; - namespace repl { namespace { + using CallbackArgs = executor::TaskExecutor::CallbackArgs; using Event = executor::TaskExecutor::EventHandle; using Handle = executor::TaskExecutor::CallbackHandle; @@ -81,23 +81,9 @@ MONGO_FP_FORWARD_DECLARE(rsSyncApplyStop); struct InitialSyncState; struct MemberState; -class RollbackChecker; class StorageInterface; -/** State for decision tree */ -enum class DataReplicatorState { - InitialSync, - Uninitialized, -}; - - -// Helper to convert enum to a string. -std::string toString(DataReplicatorState s); - -// TBD -- ignore for now -enum class DataReplicatorScope { ReplicateAll, ReplicateDB, ReplicateCollection }; - struct DataReplicatorOptions { /** Function to return optime of last operation applied on this node */ using GetMyLastOptimeFn = stdx::function<OpTime()>; @@ -117,19 +103,20 @@ struct DataReplicatorOptions { Seconds blacklistSyncSourcePenaltyForNetworkConnectionError{10}; Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10}; + // DataReplicator waits this long before retrying getApplierBatchCallback() if there are + // currently no operations available to apply or if the 'rsSyncApplyStop' failpoint is active. + // This default value is based on the duration in BackgroundSync::waitForMore() and + // SyncTail::tryPopAndWaitForMore(). + Milliseconds getApplierBatchCallbackRetryWait{1000}; + // Batching settings. - size_t replBatchLimitBytes = 512 * 1024 * 1024; - size_t replBatchLimitOperations = 5000; + std::uint32_t replBatchLimitBytes = 512 * 1024 * 1024; + std::uint32_t replBatchLimitOperations = 5000; // Replication settings NamespaceString localOplogNS = NamespaceString("local.oplog.rs"); NamespaceString remoteOplogNS = NamespaceString("local.oplog.rs"); - // TBD -- ignore below for now - DataReplicatorScope scope = DataReplicatorScope::ReplicateAll; - std::string scopeNS; - BSONObj filterCriteria; - GetMyLastOptimeFn getMyLastOptime; SetMyLastOptimeFn setMyLastOptime; GetSlaveDelayFn getSlaveDelay; @@ -138,7 +125,7 @@ struct DataReplicatorOptions { // The oplog fetcher will restart the oplog tailing query this many times on non-cancellation // failures. - std::size_t oplogFetcherMaxFetcherRestarts = 0; + std::uint32_t oplogFetcherMaxFetcherRestarts = 0; std::string toString() const { return str::stream() << "DataReplicatorOptions -- " @@ -156,11 +143,65 @@ struct DataReplicatorOptions { * * * Entry Points: - * -- doInitialSync: Will drop all data and copy to a consistent state of data (via the oplog). - * -- startup: Start data replication from existing data. + * -- startup: Start initial sync. */ class DataReplicator { + MONGO_DISALLOW_COPYING(DataReplicator); + public: + /** + * Callback function to report last applied optime (with hash) of initial sync. + */ + typedef stdx::function<void(const StatusWith<OpTimeWithHash>& lastApplied)> OnCompletionFn; + + /** + * RAII type that stores the result of a single initial sync attempt. + * Only the first result passed to setResultAndCancelRemainingWork_inlock() is saved. + * Calls '_onCompletion' on destruction with result. + * We use an invariant to ensure that a result has been provided by the caller at destruction. + */ + class OnCompletionGuard { + MONGO_DISALLOW_COPYING(OnCompletionGuard); + + public: + // Function to invoke DataReplicator::_cancelRemainingWork_inlock(). + using CancelRemainingWorkInLockFn = stdx::function<void()>; + + OnCompletionGuard(const CancelRemainingWorkInLockFn& cancelRemainingWorkInLock, + const OnCompletionFn& onCompletion); + ~OnCompletionGuard(); + + /** + * Sets result if called for the first time. + * Cancels remaining work in DataReplicator. + * Requires either a unique_lock or lock_guard to be passed in to ensure that we call + * DataReplicator::_cancelRemainingWork_inlock()) while we have a lock on the data + * replicator's mutex. + */ + void setResultAndCancelRemainingWork_inlock(const std::lock_guard<std::mutex>& lock, + const StatusWith<OpTimeWithHash>& lastApplied); + void setResultAndCancelRemainingWork_inlock(const std::unique_lock<std::mutex>& lock, + const StatusWith<OpTimeWithHash>& lastApplied); + + private: + /** + * Once we verified that we have the data replicator lock, this function is called by both + * versions of setResultAndCancelRemainingWork_inlock() to set the result and cancel any + * remaining work in the data replicator. + */ + void _setResultAndCancelRemainingWork_inlock(const StatusWith<OpTimeWithHash>& lastApplied); + + const CancelRemainingWorkInLockFn _cancelRemainingWorkInLock; + const OnCompletionFn _onCompletion; + + // _lastAppliedSet and _lastApplied are guarded by the mutex of the DataReplicator instance + // that owns this guard object. + bool _lastAppliedSet = false; + StatusWith<OpTimeWithHash> _lastApplied = + Status(ErrorCodes::InternalError, + "This initial sync attempt finished without an explicit result."); + }; + struct InitialSyncAttemptInfo { int durationMillis; Status status; @@ -172,8 +213,8 @@ public: }; struct Stats { - size_t failedInitialSyncAttempts{0}; - size_t maxFailedInitialSyncAttempts{0}; + std::uint32_t failedInitialSyncAttempts{0}; + std::uint32_t maxFailedInitialSyncAttempts{0}; Date_t initialSyncStart; Date_t initialSyncEnd; std::vector<DataReplicator::InitialSyncAttemptInfo> initialSyncAttemptInfos; @@ -185,42 +226,34 @@ public: DataReplicator(DataReplicatorOptions opts, std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState, - StorageInterface* storage); + StorageInterface* storage, + const OnCompletionFn& onCompletion); virtual ~DataReplicator(); - // Shuts down replication if "start" has been called, and blocks until shutdown has completed. - Status shutdown(); - /** - * Cancels outstanding work and begins shutting down. + * Returns true if an initial sync is currently running or in the process of shutting down. */ - Status scheduleShutdown(); + bool isActive() const; /** - * Waits for data replicator to finish shutting down. - * Data replicator will go into uninitialized state. + * Starts initial sync process, with the provided number of attempts */ - void waitForShutdown(); + Status startup(OperationContext* txn, std::uint32_t maxAttempts) noexcept; /** - * Does an initial sync, with the provided number of attempts. - * - * This should be the first method called after construction (see class comment). + * Shuts down replication if "start" has been called, and blocks until shutdown has completed. */ - StatusWith<OpTimeWithHash> doInitialSync(OperationContext* txn, std::size_t maxAttempts); - - DataReplicatorState getState() const; - - HostAndPort getSyncSource() const; - OpTimeWithHash getLastFetched() const; - OpTimeWithHash getLastApplied() const; + Status shutdown(); /** - * Number of operations in the oplog buffer. + * Block until inactive. */ - size_t getOplogBufferCount() const; + void join(); + /** + * Returns internal state in a loggable format. + */ std::string getDiagnosticString() const; /** @@ -229,10 +262,6 @@ public: */ BSONObj getInitialSyncProgress() const; - // For testing only - - void _resetState_inlock(OperationContext* txn, OpTimeWithHash lastAppliedOpTime); - /** * Overrides how executor schedules database work. * @@ -240,14 +269,239 @@ public: */ void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); + // State transitions: + // PreStart --> Running --> ShuttingDown --> Complete + // It is possible to skip intermediate states. For example, calling shutdown() when the data + // replicator has not started will transition from PreStart directly to Complete. + enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; + + /** + * Returns current data replicator state. + * For testing only. + */ + State getState_forTest() const; + private: - // Runs a single initial sync attempt. - Status _runInitialSyncAttempt_inlock(OperationContext* txn, - UniqueLock& lk, - HostAndPort syncSource); + /** + * Returns true if we are still processing initial sync tasks (_state is either Running or + * Shutdown). + */ + bool _isActive_inlock() const; + + /** + * Cancels all outstanding work. + * Used by shutdown() and CompletionGuard::setResultAndCancelRemainingWork(). + */ + void _cancelRemainingWork_inlock(); + + /** + * Returns true if the data replicator has received a shutdown request (_state is ShuttingDown). + */ + bool _isShuttingDown() const; + bool _isShuttingDown_inlock() const; + + /** + * Initial sync flowchart: + * + * start() + * | + * | + * V + * _setUp_inlock() + * | + * | + * V + * _startInitialSyncAttemptCallback() + * | + * | + * |<-------+ + * | | + * | | (bad sync source) + * | | + * V | + * _chooseSyncSourceCallback() + * | + * | + * | (good sync source found) + * | + * | + * V + * _recreateOplogAndDropReplicatedDatabases() + * | + * | + * V + * _rollbackCheckerResetCallback() + * | + * | + * V + * _lastOplogEntryFetcherCallbackForBeginTimestamp() + * | + * | + * +------------------------------+ + * | | + * | | + * V V + * _oplogFetcherCallback() _databasesClonerCallback + * | | + * | | + * | V + * | _lastOplogEntryFetcherCallbackForStopTimestamp() + * | | | + * | | | + * | (no ops to apply) | | (have ops to apply) + * | | | + * | | V + * | | _getNextApplierBatchCallback()<-----+ + * | | | ^ | + * | | | | | + * | | | (no docs fetched | | + * | | | and end ts not | | + * | | | reached) | | + * | | | | | + * | | V | | + * | | _multiApplierCallback()-----+ | + * | | | | | + * | | | | | + * | | | | (docs fetched) | (end ts not + * | | | | | reached) + * | | | V | + * | | | _lastOplogEntryFetcherCallbackAfter- + * | | | FetchingMissingDocuments() + * | | | | + * | | | | + * | (reached end timestamp) + * | | | | + * | V V V + * | _rollbackCheckerCheckForRollbackCallback() + * | | + * | | + * +------------------------------+ + * | + * | + * V + * _finishInitialSyncAttempt() + * | + * | + * V + * _finishCallback() + */ + + /** + * Sets up internal state to begin initial sync. + */ + void _setUp_inlock(OperationContext* txn, std::uint32_t initialSyncMaxAttempts); - void _setState(const DataReplicatorState& newState); - void _setState_inlock(const DataReplicatorState& newState); + /** + * Tears down internal state before reporting final status to caller. + */ + void _tearDown_inlock(OperationContext* txn, const StatusWith<OpTimeWithHash>& lastApplied); + + /** + * Callback to start a single initial sync attempt. + */ + void _startInitialSyncAttemptCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::uint32_t initialSyncAttempt, + std::uint32_t initialSyncMaxAttempts); + + /** + * Callback to obtain sync source from sync source selector. + * For every initial sync attempt, we will try up to 'numInitialSyncConnectAttempts' times (at + * an interval of '_opts.syncSourceRetryWait' ms) to obtain a valid sync source before giving up + * and returning ErrorCodes::InitialSyncOplogSourceMissing. + */ + void _chooseSyncSourceCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::uint32_t chooseSyncSourceAttempt, + std::uint32_t chooseSyncSourceMaxAttempts, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * This function does the following: + * 1.) Drop oplog. + * 2.) Drop user databases (replicated dbs). + * 3.) Create oplog. + */ + Status _recreateOplogAndDropReplicatedDatabases(); + + /** + * Callback for rollback checker's first replSetGetRBID command before starting data cloning. + */ + void _rollbackCheckerResetCallback(const RollbackChecker::Result& result, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Callback for first '_lastOplogEntryFetcher' callback. A successful response lets us + * determine the starting point for tailing the oplog using the OplogFetcher as well as + * setting a reference point for the state of the sync source's oplog when data cloning + * completes. + */ + void _lastOplogEntryFetcherCallbackForBeginTimestamp( + const StatusWith<Fetcher::QueryResponse>& result, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Callback for oplog fetcher. + */ + void _oplogFetcherCallback(const Status& status, + const OpTimeWithHash& lastFetched, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Callback for DatabasesCloner. + */ + void _databasesClonerCallback(const Status& status, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Callback for second '_lastOplogEntryFetcher' callback. This is scheduled to obtain the stop + * timestamp after DatabasesCloner has completed and enables us to determine if the oplog on + * the sync source has advanced since we started cloning the databases. + */ + void _lastOplogEntryFetcherCallbackForStopTimestamp( + const StatusWith<Fetcher::QueryResponse>& result, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Callback to obtain next batch of operations to apply. + */ + void _getNextApplierBatchCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Callback for MultiApplier completion. + */ + void _multiApplierCallback(const Status& status, + OpTimeWithHash lastApplied, + std::uint32_t numApplied, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Callback for third '_lastOplogEntryFetcher' callback. This is scheduled after MultiApplier + * completed successfully and missing documents were fetched from the sync source while + * DataReplicatorExternalState::_multiInitialSyncApply() was processing operations. + * This callback will update InitialSyncState::stopTimestamp on success. + */ + void _lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( + const StatusWith<Fetcher::QueryResponse>& result, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Callback for rollback checker's last replSetGetRBID command after cloning data and applying + * operations. + */ + void _rollbackCheckerCheckForRollbackCallback( + const RollbackChecker::Result& result, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Reports result of current initial sync attempt. May schedule another initial sync attempt + * depending on shutdown state and whether we've exhausted all initial sync retries. + */ + void _finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& lastApplied); + + /** + * Invokes completion callback and transitions state to State::kComplete. + */ + void _finishCallback(StatusWith<OpTimeWithHash> lastApplied); // Obtains a valid sync source from the sync source selector. // Returns error if a sync source cannot be found. @@ -260,32 +514,81 @@ private: void _enqueueDocuments(Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, const OplogFetcher::DocumentsInfo& info); - void _onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched); - void _doNextActions(); - void _doNextActions_inlock(); BSONObj _getInitialSyncProgress_inlock() const; StatusWith<Operations> _getNextApplierBatch_inlock(); - void _onApplyBatchFinish(const Status& status, - OpTimeWithHash lastApplied, - std::size_t numApplied); - - // Called when the DatabasesCloner finishes. - void _onDataClonerFinish(const Status& status, HostAndPort syncSource); - // Called after _onDataClonerFinish when the new Timestamp is avail, to use for minvalid. - void _onApplierReadyStart(const QueryResponseStatus& fetchResult); - // Called during _onApplyBatchFinish when we fetched a missing document and must reset minValid. - void _onFetchMissingDocument_inlock(OpTimeWithHash lastApplied, std::size_t numApplied); - // Schedules a fetcher to get the last oplog entry from the sync source. - void _scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback); - - Status _scheduleDoNextActions(); - Status _scheduleApplyBatch_inlock(); - - void _cancelAllHandles_inlock(); - void _waitOnAndResetAll_inlock(UniqueLock* lk); - bool _anyActiveHandles_inlock() const; + + /** + * Schedules a fetcher to get the last oplog entry from the sync source. + */ + Status _scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback); + + /** + * Checks the current oplog application progress (begin and end timestamps). + * If necessary, schedules a _getNextApplierBatchCallback() task. + * If the stop and end timestamps are inconsistent or if there is an issue scheduling the task, + * we set the error status in 'onCompletionGuard' and shut down the OplogFetcher. + * Passes 'lock' through to completion guard. + */ + void _checkApplierProgressAndScheduleGetNextApplierBatch_inlock( + const std::lock_guard<std::mutex>& lock, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Schedules a rollback checker to get the rollback ID after data cloning or applying. This + * helps us check if a rollback occurred on the sync source. + * If we fail to schedule the rollback checker, we set the error status in 'onCompletionGuard' + * and shut down the OplogFetcher. + * Passes 'lock' through to completion guard. + */ + void _scheduleRollbackCheckerCheckForRollback_inlock( + const std::lock_guard<std::mutex>& lock, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Checks the given status (or embedded status inside the callback args) and current data + * replicator shutdown state. If the given status is not OK or if we are shutting down, returns + * a new error status that should be passed to _finishCallback. The reason in the new error + * status will include 'message'. + * Otherwise, returns Status::OK(). + */ + Status _checkForShutdownAndConvertStatus_inlock( + const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message); + Status _checkForShutdownAndConvertStatus_inlock(const Status& status, + const std::string& message); + + /** + * Schedules work to be run by the task executor. + * Saves handle if work was successfully scheduled. + * Returns scheduleWork status (without the handle). + */ + Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name); + Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when, + const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name); + + /** + * Cancels task executor callback handle if not null. + */ + void _cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle); + + /** + * Starts up component and checks data replicator's shutdown state at the same time. + * If component's startup() fails, resets 'component' (which is assumed to be a unique_ptr + * to the component type). + */ + template <typename Component> + Status _startupComponent_inlock(Component& component); + + /** + * Shuts down component if not null. + */ + template <typename Component> + void _shutdownComponent_inlock(Component& component); // Counts how many documents have been refetched from the source in the current batch. AtomicUInt32 _fetchCount; @@ -305,34 +608,54 @@ private: const DataReplicatorOptions _opts; // (R) std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (R) executor::TaskExecutor* _exec; // (R) - DataReplicatorState _dataReplicatorState; // (MX) - std::unique_ptr<InitialSyncState> _initialSyncState; // (M) - StorageInterface* _storage; // (M) - std::unique_ptr<OplogFetcher> _oplogFetcher; // (S) - std::unique_ptr<Fetcher> _lastOplogEntryFetcher; // (S) - bool _applierPaused = false; // (X) - std::unique_ptr<MultiApplier> _applier; // (M) - std::unique_ptr<MultiApplier> _shuttingDownApplier; // (M) - HostAndPort _syncSource; // (M) - OpTimeWithHash _lastFetched; // (MX) - OpTimeWithHash _lastApplied; // (MX) - std::unique_ptr<OplogBuffer> _oplogBuffer; // (M) - - // Set to true when shutdown is requested. This flag should be checked by - // the data replicator during initial sync so that it can interrupt the - // the current operation and gracefully transition to completion with - // a shutdown status. - bool _inShutdown = false; // (M) - // Set to true when the _onShutdown event is signaled for the first time. - // Ensures that we do not signal the shutdown event more than once (which - // is disallowed by the task executor. - bool _onShutdownSignaled = false; // (M) - // Created when shutdown is requested. Signaled at most once when the data - // replicator is determining its next steps between task executor callbacks. - Event _onShutdown; // (M) + StorageInterface* _storage; // (R) + + // This is invoked with the final status of the initial sync. If startup() fails, this callback + // is never invoked. The caller gets the last applied optime with hash when the initial sync + // completes successfully or an error status. + // '_onCompletion' is cleared on completion (in _finishCallback()) in order to release any + // resources that might be held by the callback function object. + OnCompletionFn _onCompletion; // (M) + + // Handle to currently scheduled _startInitialSyncAttemptCallback() task. + executor::TaskExecutor::CallbackHandle _startInitialSyncAttemptHandle; // (M) + + // Handle to currently scheduled _chooseSyncSourceCallback() task. + executor::TaskExecutor::CallbackHandle _chooseSyncSourceHandle; // (M) + + // RollbackChecker to get rollback ID before and after each initial sync attempt. + std::unique_ptr<RollbackChecker> _rollbackChecker; // (M) + // Handle returned from RollbackChecker::reset(). + RollbackChecker::CallbackHandle _getBaseRollbackIdHandle; // (M) + + // Handle returned from RollbackChecker::checkForRollback(). + RollbackChecker::CallbackHandle _getLastRollbackIdHandle; // (M) + + // Handle to currently scheduled _getNextApplierBatchCallback() task. + executor::TaskExecutor::CallbackHandle _getNextApplierBatchHandle; // (M) + + std::unique_ptr<InitialSyncState> _initialSyncState; // (M) + std::unique_ptr<OplogFetcher> _oplogFetcher; // (S) + std::unique_ptr<Fetcher> _lastOplogEntryFetcher; // (S) + std::unique_ptr<MultiApplier> _applier; // (M) + HostAndPort _syncSource; // (M) + OpTimeWithHash _lastFetched; // (MX) + OpTimeWithHash _lastApplied; // (MX) + std::unique_ptr<OplogBuffer> _oplogBuffer; // (M) + + // Used to signal changes in _state. + mutable stdx::condition_variable _stateCondition; + + // Current data replicator state. See comments for State enum class for details. + State _state = State::kPreStart; // (M) + + // Passed to CollectionCloner via DatabasesCloner. CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) - Stats _stats; // (M) + + // Contains stats on the current initial sync request (includes all attempts). + // To access these stats in a user-readable format, use getInitialSyncProgress(). + Stats _stats; // (M) }; } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index aad8fd0843e..8e6c521c71d 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -76,7 +76,7 @@ std::unique_ptr<OplogBuffer> DataReplicatorExternalStateMock::makeSteadyStateOpl } StatusWith<ReplicaSetConfig> DataReplicatorExternalStateMock::getCurrentConfig() const { - return replSetConfig; + return replSetConfigResult; } StatusWith<OpTime> DataReplicatorExternalStateMock::_multiApply( @@ -93,7 +93,8 @@ Status DataReplicatorExternalStateMock::_multiSyncApply(MultiApplier::OperationP Status DataReplicatorExternalStateMock::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount) { - return Status::OK(); + + return multiInitialSyncApplyFn(ops, source, fetchCount); } } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index b85332bb750..45b755d525a 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -84,7 +84,13 @@ public: // Override to change multiApply behavior. MultiApplier::MultiApplyFn multiApplyFn; - ReplicaSetConfig replSetConfig; + // Override to change _multiInitialSyncApply behavior. + using MultiInitialSyncApplyFn = stdx::function<Status( + MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount)>; + MultiInitialSyncApplyFn multiInitialSyncApplyFn = []( + MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32*) { return Status::OK(); }; + + StatusWith<ReplicaSetConfig> replSetConfigResult = ReplicaSetConfig(); private: StatusWith<OpTime> _multiApply(OperationContext* txn, diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 821a3a12a42..524c95125b7 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -28,7 +28,9 @@ #include "mongo/platform/basic.h" +#include <iosfwd> #include <memory> +#include <ostream> #include "mongo/client/fetcher.h" #include "mongo/db/client.h" @@ -38,6 +40,7 @@ #include "mongo/db/repl/data_replicator.h" #include "mongo/db/repl/data_replicator_external_state_mock.h" #include "mongo/db/repl/member_state.h" +#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/reporter.h" @@ -60,6 +63,30 @@ #include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" +namespace mongo { +namespace repl { + +/** + * Insertion operator for DataReplicator::State. Formats data replicator state for output stream. + */ +std::ostream& operator<<(std::ostream& os, const DataReplicator::State& state) { + switch (state) { + case DataReplicator::State::kPreStart: + return os << "PreStart"; + case DataReplicator::State::kRunning: + return os << "Running"; + case DataReplicator::State::kShuttingDown: + return os << "ShuttingDown"; + case DataReplicator::State::kComplete: + return os << "Complete"; + } + MONGO_UNREACHABLE; +} + +} // namespace repl +} // namespace mongo + + namespace { using namespace mongo; @@ -88,6 +115,13 @@ public: TaskExecutorMock(executor::TaskExecutor* executor, ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} + StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) { + if (shouldFailScheduleWork) { + return Status(ErrorCodes::OperationFailed, "failed to schedule work"); + } + return getExecutor()->scheduleWork(work); + } + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) { if (shouldFailScheduleWorkAt) { return Status(ErrorCodes::OperationFailed, @@ -104,6 +138,7 @@ public: return getExecutor()->scheduleRemoteCommand(request, cb); } + bool shouldFailScheduleWork = false; bool shouldFailScheduleWorkAt = false; private: @@ -181,6 +216,12 @@ public: finishProcessingNetworkResponse(); } + /** + * Schedules and processes a successful response to the network request sent by DataReplicator's + * last oplog entry fetcher. Also validates the find command arguments in the request. + */ + void processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs); + void finishProcessingNetworkResponse() { getNet()->runReadyNetworkOperations(); if (getNet()->hasReadyRequests()) { @@ -285,12 +326,14 @@ protected: _myLastOpTime = OpTime({3, 0}, 1); DataReplicatorOptions options; - options.initialSyncRetryWait = Milliseconds(0); + options.initialSyncRetryWait = Milliseconds(1); options.getMyLastOptime = [this]() { return _myLastOpTime; }; options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); }; options.getSlaveDelay = [this]() { return Seconds(0); }; options.syncSourceSelector = this; + _options = options; + ThreadPool::Options threadPoolOptions; threadPoolOptions.poolName = "replication"; threadPoolOptions.minThreads = 1U; @@ -317,14 +360,26 @@ protected: << "localhost:12345")) << "settings" << BSON("electionTimeoutMillis" << 10000)))); - dataReplicatorExternalState->replSetConfig = config; + dataReplicatorExternalState->replSetConfigResult = config; } _externalState = dataReplicatorExternalState.get(); + _lastApplied = getDetectableErrorStatus(); + _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) { + _lastApplied = lastApplied; + }; try { - _dr.reset(new DataReplicator( - options, std::move(dataReplicatorExternalState), _storageInterface.get())); + // When creating DataReplicator, we wrap _onCompletion so that we can override the + // DataReplicator's callback behavior post-construction. + // See DataReplicatorTransitionsToCompleteWhenFinishCallbackThrowsException. + _dr = stdx::make_unique<DataReplicator>( + options, + std::move(dataReplicatorExternalState), + _storageInterface.get(), + [this](const StatusWith<OpTimeWithHash>& lastApplied) { + _onCompletion(lastApplied); + }); _dr->setScheduleDbWorkFn_forTest( [this](const executor::TaskExecutor::CallbackFn& work) { return getExecutor().scheduleWork(work); @@ -375,14 +430,18 @@ protected: TaskExecutorMock::ShouldFailRequestFn _shouldFailRequest; std::unique_ptr<TaskExecutorMock> _executorProxy; + DataReplicatorOptions _options; DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime; OpTime _myLastOpTime; - std::unique_ptr<SyncSourceSelector> _syncSourceSelector; + std::unique_ptr<SyncSourceSelectorMock> _syncSourceSelector; std::unique_ptr<StorageInterfaceMock> _storageInterface; std::unique_ptr<OldThreadPool> _dbWorkThreadPool; std::map<NamespaceString, CollectionMockStats> _collectionStats; std::map<NamespaceString, CollectionCloneInfo> _collections; + StatusWith<OpTimeWithHash> _lastApplied = Status(ErrorCodes::NotYetInitialized, ""); + DataReplicator::OnCompletionFn _onCompletion; + private: DataReplicatorExternalStateMock* _externalState; std::unique_ptr<DataReplicator> _dr; @@ -395,577 +454,1612 @@ executor::ThreadPoolMock::Options DataReplicatorTest::makeThreadPoolMockOptions( return options; } +void advanceClock(NetworkInterfaceMock* net, Milliseconds duration) { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + auto when = net->now() + duration; + ASSERT_EQUALS(when, net->runUntil(when)); +} + ServiceContext::UniqueOperationContext makeOpCtx() { return cc().makeOperationContext(); } -TEST_F(DataReplicatorTest, CreateDestroy) {} - -// Used to run a Initial Sync in a separate thread, to avoid blocking test execution. -class InitialSyncBackgroundRunner { -public: - InitialSyncBackgroundRunner(DataReplicator* dr, std::size_t maxAttempts) - : _dr(dr), _maxAttempts(maxAttempts) {} +/** + * Generates a replSetGetRBID response. + */ +BSONObj makeRollbackCheckerResponse(int rollbackId) { + return BSON("ok" << 1 << "rbid" << rollbackId); +} - ~InitialSyncBackgroundRunner() { - if (_thread) { - _thread->join(); +/** + * Generates a cursor response for a Fetcher to consume. + */ +BSONObj makeCursorResponse(CursorId cursorId, + const NamespaceString& nss, + std::vector<BSONObj> docs, + bool isFirstBatch = true) { + BSONObjBuilder bob; + { + BSONObjBuilder cursorBob(bob.subobjStart("cursor")); + cursorBob.append("id", cursorId); + cursorBob.append("ns", nss.toString()); + { + BSONArrayBuilder batchBob( + cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch")); + for (const auto& doc : docs) { + batchBob.append(doc); + } } } + bob.append("ok", 1); + return bob.obj(); +} - // Could block if initial sync has not finished. - StatusWith<OpTimeWithHash> getResult(NetworkInterfaceMock* net) { - while (!isDone()) { - NetworkGuard guard(net); - // if (net->hasReadyRequests()) { - net->runReadyNetworkOperations(); - // } +/** + * Generates a listDatabases response for a DatabasesCloner to consume. + */ +BSONObj makeListDatabasesResponse(std::vector<std::string> databaseNames) { + BSONObjBuilder bob; + { + BSONArrayBuilder databasesBob(bob.subarrayStart("databases")); + for (const auto& name : databaseNames) { + BSONObjBuilder nameBob(databasesBob.subobjStart()); + nameBob.append("name", name); } - _thread->join(); - _thread.reset(); + } + bob.append("ok", 1); + return bob.obj(); +} + +/** + * Generates oplog entries with the given number used for the timestamp. + */ +BSONObj makeOplogEntry(int t, const char* opType = "i", int version = OplogEntry::kOplogVersion) { + return BSON("ts" << Timestamp(t, 1) << "h" << static_cast<long long>(t) << "ns" + << "a.a" + << "v" + << version + << "op" + << opType + << "o" + << BSON("_id" << t << "a" << t)); +} - LockGuard lk(_mutex); - return _result; +void DataReplicatorTest::processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs) { + auto net = getNet(); + auto request = assertRemoteCommandNameEquals( + "find", + net->scheduleSuccessfulResponse(makeCursorResponse(0LL, _options.localOplogNS, docs))); + ASSERT_EQUALS(1, request.cmdObj.getIntField("limit")); + ASSERT_TRUE(request.cmdObj.hasField("sort")); + ASSERT_EQUALS(mongo::BSONType::Object, request.cmdObj["sort"].type()); + ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort")); + net->runReadyNetworkOperations(); +} + +TEST_F(DataReplicatorTest, InvalidConstruction) { + DataReplicatorOptions options; + options.getMyLastOptime = []() { return OpTime(); }; + options.setMyLastOptime = [](const OpTime&) {}; + options.getSlaveDelay = []() { return Seconds(0); }; + options.syncSourceSelector = this; + auto callback = [](const StatusWith<OpTimeWithHash>&) {}; + + // Null task executor in external state. + { + auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>(); + ASSERT_THROWS_CODE_AND_WHAT( + DataReplicator( + options, std::move(dataReplicatorExternalState), _storageInterface.get(), callback), + UserException, + ErrorCodes::BadValue, + "task executor cannot be null"); } - bool isDone() { - LockGuard lk(_mutex); - return (_result.getStatus().code() != ErrorCodes::NotYetInitialized); + // Null callback function. + { + auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>(); + dataReplicatorExternalState->taskExecutor = &getExecutor(); + ASSERT_THROWS_CODE_AND_WHAT(DataReplicator(options, + std::move(dataReplicatorExternalState), + _storageInterface.get(), + DataReplicator::OnCompletionFn()), + UserException, + ErrorCodes::BadValue, + "callback function cannot be null"); } +} + +TEST_F(DataReplicatorTest, CreateDestroy) {} + +const std::uint32_t maxAttempts = 1U; + +TEST_F(DataReplicatorTest, StartupReturnsIllegalOperationIfAlreadyActive) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + ASSERT_FALSE(dr->isActive()); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + ASSERT_TRUE(dr->isActive()); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, dr->startup(txn.get(), maxAttempts)); + ASSERT_TRUE(dr->isActive()); +} + +TEST_F(DataReplicatorTest, StartupReturnsShutdownInProgressIfDataReplicatorIsShuttingDown) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + ASSERT_FALSE(dr->isActive()); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + ASSERT_TRUE(dr->isActive()); + ASSERT_OK(dr->shutdown()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(txn.get(), maxAttempts)); +} + +TEST_F(DataReplicatorTest, StartupReturnsShutdownInProgressIfExecutorIsShutdown) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + getExecutor().shutdown(); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(txn.get(), maxAttempts)); + ASSERT_FALSE(dr->isActive()); + + // Cannot startup data replicator again since it's in the Complete state. + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(txn.get(), maxAttempts)); +} - bool isActive() { - return (_dr && _dr->getState() == DataReplicatorState::InitialSync) && !isDone(); +TEST_F(DataReplicatorTest, ShutdownTransitionsStateToCompleteIfCalledBeforeStartup) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + ASSERT_OK(dr->shutdown()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(txn.get(), maxAttempts)); + // Data replicator is inactive when it's in the Complete state. + ASSERT_FALSE(dr->isActive()); +} + +TEST_F(DataReplicatorTest, StartupSetsInitialSyncFlagOnSuccess) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + // Initial sync flag should not be set before starting. + ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get())); + + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + ASSERT_TRUE(dr->isActive()); + + // Initial sync flag should be set. + ASSERT_TRUE(getStorage().getInitialSyncFlag(txn.get())); +} + +TEST_F(DataReplicatorTest, DataReplicatorReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + // This will cancel the _startInitialSyncAttemptCallback() task scheduled by startup(). + ASSERT_OK(dr->shutdown()); + + dr->join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +const std::uint32_t chooseSyncSourceMaxAttempts = 10U; + +/** + * Advances executor clock so that DataReplicator exhausts all 'chooseSyncSourceMaxAttempts' (server + * parameter numInitialSyncConnectAttempts) sync source selection attempts. + * If SyncSourceSelectorMock keeps returning an invalid sync source, DataReplicator will retry every + * '_options.syncSourceRetryWait' ms up to a maximum of 'chooseSyncSourceMaxAttempts' attempts. + */ +void _simulateChooseSyncSourceFailure(executor::NetworkInterfaceMock* net, + Milliseconds syncSourceRetryWait) { + advanceClock(net, int(chooseSyncSourceMaxAttempts - 1) * syncSourceRetryWait); +} + +TEST_F( + DataReplicatorTest, + DataReplicatorReturnsInitialSyncOplogSourceMissingIfNoValidSyncSourceCanBeFoundAfterTenFailedChooseSyncSourceAttempts) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + // Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup() + // because DataReplicator will look for a valid sync source immediately after startup. + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort()); + + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + _simulateChooseSyncSourceFailure(getNet(), _options.syncSourceRetryWait); + + dr->join(); + + ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied); +} + +// Confirms that DataReplicator keeps retrying initial sync. +// Make every initial sync attempt fail early by having the sync source selector always return an +// invalid sync source. +TEST_F(DataReplicatorTest, + DataReplicatorRetriesInitialSyncUpToMaxAttemptsAndReturnsLastAttemptError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort()); + + const std::uint32_t initialSyncMaxAttempts = 3U; + ASSERT_OK(dr->startup(txn.get(), initialSyncMaxAttempts)); + + auto net = getNet(); + for (std::uint32_t i = 0; i < initialSyncMaxAttempts; ++i) { + _simulateChooseSyncSourceFailure(net, _options.syncSourceRetryWait); + advanceClock(net, _options.initialSyncRetryWait); } - void run() { - UniqueLock lk(_mutex); - _thread.reset(new stdx::thread(stdx::bind(&InitialSyncBackgroundRunner::_run, this))); - _condVar.wait(lk); + dr->join(); + + ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied); + + // Check number of failed attempts in stats. + auto progress = dr->getInitialSyncProgress(); + unittest::log() << "Progress after " << initialSyncMaxAttempts + << " failed attempts: " << progress; + ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), int(initialSyncMaxAttempts)) + << progress; + ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), int(initialSyncMaxAttempts)) + << progress; +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsCallbackCanceledIfShutdownWhileRetryingSyncSourceSelection) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort()); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + auto when = net->now() + _options.syncSourceRetryWait / 2; + ASSERT_GREATER_THAN(when, net->now()); + ASSERT_EQUALS(when, net->runUntil(when)); } - BSONObj getInitialSyncProgress() { - return _dr->getInitialSyncProgress(); + // This will cancel the _chooseSyncSourceCallback() task scheduled at getNet()->now() + + // '_options.syncSourceRetryWait'. + ASSERT_OK(dr->shutdown()); + + dr->join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +TEST_F( + DataReplicatorTest, + DataReplicatorReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextChooseSyncSourceCallback) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort()); + _executorProxy->shouldFailScheduleWorkAt = true; + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + dr->join(); + + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextInitialSyncAttempt) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort()); + + ASSERT_EQUALS(DataReplicator::State::kPreStart, dr->getState_forTest()); + + ASSERT_OK(dr->startup(txn.get(), 2U)); + ASSERT_EQUALS(DataReplicator::State::kRunning, dr->getState_forTest()); + + // Advance clock so that we run all but the last sync source callback. + auto net = getNet(); + advanceClock(net, int(chooseSyncSourceMaxAttempts - 2) * _options.syncSourceRetryWait); + + // Last choose sync source attempt should now be scheduled. Advance clock so we fail last + // choose sync source attempt which cause the next initial sync attempt to be scheduled. + _executorProxy->shouldFailScheduleWorkAt = true; + advanceClock(net, _options.syncSourceRetryWait); + + dr->join(); + + ASSERT_EQUALS(DataReplicator::State::kComplete, dr->getState_forTest()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +// This test verifies that the data replication will still transition to a complete state even if +// the completion callback function throws an exception. +TEST_F(DataReplicatorTest, DataReplicatorTransitionsToCompleteWhenFinishCallbackThrowsException) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) { + _lastApplied = lastApplied; + uassert(ErrorCodes::InternalError, "", false); + }; + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort()); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + ASSERT_OK(dr->shutdown()); + dr->join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +class SharedCallbackState { + MONGO_DISALLOW_COPYING(SharedCallbackState); + +public: + explicit SharedCallbackState(bool* sharedCallbackStateDestroyed) + : _sharedCallbackStateDestroyed(sharedCallbackStateDestroyed) {} + ~SharedCallbackState() { + *_sharedCallbackStateDestroyed = true; } private: - void _run() { - setThreadName("InitialSyncRunner"); - Client::initThreadIfNotAlready(); - auto txn = getGlobalServiceContext()->makeOperationContext(&cc()); + bool* _sharedCallbackStateDestroyed; +}; + +TEST_F(DataReplicatorTest, DataReplicatorResetsOnCompletionCallbackFunctionPointerUponCompletion) { + bool sharedCallbackStateDestroyed = false; + auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed); + decltype(_lastApplied) lastApplied = getDetectableErrorStatus(); + + auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>(); + dataReplicatorExternalState->taskExecutor = &getExecutor(); + auto dr = stdx::make_unique<DataReplicator>( + _options, + std::move(dataReplicatorExternalState), + _storageInterface.get(), + [&lastApplied, sharedCallbackData](const StatusWith<OpTimeWithHash>& result) { + lastApplied = result; + }); + ON_BLOCK_EXIT([this]() { getExecutor().shutdown(); }); + + auto txn = makeOpCtx(); + + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + sharedCallbackData.reset(); + ASSERT_FALSE(sharedCallbackStateDestroyed); + + ASSERT_OK(dr->shutdown()); + dr->join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, lastApplied); + + // DataReplicator should reset 'DataReplicator::_onCompletion' after running callback function + // for the last time before becoming inactive. + // This ensures that we release resources associated with 'DataReplicator::_onCompletion'. + ASSERT_TRUE(sharedCallbackStateDestroyed); +} + +TEST_F(DataReplicatorTest, DataReplicatorRecreatesOplogAndDropsReplicatedDatabases) { + // We are not interested in proceeding beyond the oplog creation stage so we inject a failure + // after setting '_storageInterfaceWorkDone.createOplogCalled' to true. + auto oldCreateOplogFn = _storageInterface->createOplogFn; + _storageInterface->createOplogFn = [oldCreateOplogFn](OperationContext* txn, + const NamespaceString& nss) { + oldCreateOplogFn(txn, nss); + return Status(ErrorCodes::OperationFailed, "oplog creation failed"); + }; + + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); + + LockGuard lock(_storageInterfaceWorkDoneMutex); + ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs); + ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled); +} + +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetRollbackIdScheduleError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + // replSetGetRBID is the first remote command to be scheduled by the data replicator after + // creating the oplog collection. + executor::RemoteCommandRequest request; + _shouldFailRequest = [&request](const executor::RemoteCommandRequest& requestToSend) { + request = requestToSend; + return true; + }; + + HostAndPort syncSource("localhost", 12345); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); + + ASSERT_EQUALS("admin", request.dbname); + assertRemoteCommandNameEquals("replSetGetRBID", request); + ASSERT_EQUALS(syncSource, request.target); +} + +TEST_F( + DataReplicatorTest, + DataReplicatorReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) { + // The rollback id request is sent immediately after oplog creation. We shut the task executor + // down before returning from createOplog() to make the scheduleRemoteCommand() call for + // replSetGetRBID fail. + auto oldCreateOplogFn = _storageInterface->createOplogFn; + _storageInterface->createOplogFn = [oldCreateOplogFn, this](OperationContext* txn, + const NamespaceString& nss) { + auto status = oldCreateOplogFn(txn, nss); + getExecutor().shutdown(); + return status; + }; + + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + dr->join(); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied); + + LockGuard lock(_storageInterfaceWorkDoneMutex); + ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled); +} + +TEST_F(DataReplicatorTest, DataReplicatorCancelsRollbackCheckerOnShutdown) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + HostAndPort syncSource("localhost", 12345); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource); - // Synchonize this thread starting with the call in run() above. - UniqueLock lk(_mutex); - _condVar.notify_all(); - lk.unlock(); + ASSERT_EQUALS(DataReplicator::State::kPreStart, dr->getState_forTest()); - auto result = _dr->doInitialSync(txn.get(), _maxAttempts); // blocking + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + ASSERT_EQUALS(DataReplicator::State::kRunning, dr->getState_forTest()); - lk.lock(); - _result = result; + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + const auto& request = assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest()); + ASSERT_EQUALS("admin", request.dbname); + ASSERT_EQUALS(syncSource, request.target); + net->blackHole(noi); } - stdx::mutex _mutex; // protects _result. - StatusWith<OpTimeWithHash> _result{ErrorCodes::NotYetInitialized, "InitialSync not started."}; + ASSERT_OK(dr->shutdown()); + // Since we need to request the NetworkInterfaceMock to deliver the cancellation event, + // the DataReplicator has to be in a pre-completion state (ie. ShuttingDown). + ASSERT_EQUALS(DataReplicator::State::kShuttingDown, dr->getState_forTest()); - DataReplicator* _dr; - const std::size_t _maxAttempts; - std::unique_ptr<stdx::thread> _thread; - stdx::condition_variable _condVar; -}; + executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations(); -bool isOplogGetMore(const NetworkInterfaceMock::NetworkOperationIterator& noi) { - const RemoteCommandRequest& req = noi->getRequest(); - const auto parsedGetMoreStatus = GetMoreRequest::parseFromBSON(req.dbname, req.cmdObj); - if (!parsedGetMoreStatus.isOK()) { - return false; + dr->join(); + ASSERT_EQUALS(DataReplicator::State::kComplete, dr->getState_forTest()); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughRollbackCheckerCallbackError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + assertRemoteCommandNameEquals( + "replSetGetRBID", + net->scheduleErrorResponse( + Status(ErrorCodes::OperationFailed, "replSetGetRBID failed at sync source"))); + net->runReadyNetworkOperations(); } - const auto getMoreReq = parsedGetMoreStatus.getValue(); - return (getMoreReq.nss.isOplog() && getMoreReq.cursorid == 1LL); -} - -// Should match this: { killCursors: "oplog.rs", cursors: [ 1 ] } -bool isOplogKillCursor(const NetworkInterfaceMock::NetworkOperationIterator& noi) { - const BSONObj reqBSON = noi->getRequest().cmdObj; - const auto nsElem = reqBSON["killCursors"]; - const auto isOplogNS = - nsElem && NamespaceString{"local.oplog.rs"}.coll().equalCaseInsensitive(nsElem.str()); - if (isOplogNS) { - const auto cursorsVector = reqBSON["cursors"].Array(); - auto hasCursorId = false; - std::for_each( - cursorsVector.begin(), cursorsVector.end(), [&hasCursorId](const BSONElement& elem) { - if (elem.safeNumberLong() == 1LL) { - hasCursorId = true; - } - }); - return isOplogNS && hasCursorId; + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherScheduleError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + // The last oplog entry fetcher is the first component that sends a find command so we reject + // any find commands and save the request for inspection at the end of this test case. + executor::RemoteCommandRequest request; + _shouldFailRequest = [&request](const executor::RemoteCommandRequest& requestToSend) { + request = requestToSend; + return "find" == requestToSend.cmdObj.firstElement().fieldNameStringData(); + }; + + HostAndPort syncSource("localhost", 12345); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); } - return false; + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); + + ASSERT_EQUALS(syncSource, request.target); + ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname); + assertRemoteCommandNameEquals("find", request); + ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort")); + ASSERT_EQUALS(1, request.cmdObj.getIntField("limit")); } -class InitialSyncTest : public DataReplicatorTest { -public: - using Responses = std::vector<std::pair<std::string, BSONObj>>; - InitialSyncTest(){}; +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherCallbackError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); -protected: - void setResponses(Responses resps) { - _responses = resps; + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + assertRemoteCommandNameEquals( + "find", + net->scheduleErrorResponse( + Status(ErrorCodes::OperationFailed, "find command failed at sync source"))); + net->runReadyNetworkOperations(); } - void startSync(std::size_t maxAttempts) { - DataReplicator* dr = &(getDR()); - _isbr.reset(new InitialSyncBackgroundRunner(dr, maxAttempts)); - _isbr->run(); + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorCancelsLastOplogEntryFetcherOnShutdown) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + ASSERT_TRUE(net->hasReadyRequests()); } - void playResponses() { - NetworkInterfaceMock* net = getNet(); - int processedRequests(0); - const int expectedResponses(_responses.size()); - - Date_t lastLog{Date_t::now()}; - while (true) { - if (_isbr && _isbr->isDone()) { - log() << "There are " << (expectedResponses - processedRequests) - << " responses left which were unprocessed."; - return; - } + ASSERT_OK(dr->shutdown()); + executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations(); - NetworkGuard guard(net); + dr->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} - if (!net->hasReadyRequests()) { - net->runReadyNetworkOperations(); - continue; - } +TEST_F(DataReplicatorTest, + DataReplicatorReturnsNoMatchingDocumentIfLastOplogEntryFetcherReturnsEmptyBatchOfDocuments) { + auto dr = &getDR(); + auto txn = makeOpCtx(); - auto noi = net->getNextReadyRequest(); - if (isOplogGetMore(noi)) { - // process getmore requests from the oplog fetcher - int c = int(numGetMoreOplogEntries + 2); - lastGetMoreOplogEntry = BSON("ts" << Timestamp(Seconds(c), 1) << "h" << 1LL << "ns" - << "test.a" - << "v" - << OplogEntry::kOplogVersion - << "op" - << "i" - << "o" - << BSON("_id" << c)); - ++numGetMoreOplogEntries; - mongo::CursorId cursorId = - numGetMoreOplogEntries == numGetMoreOplogEntriesMax ? 0 : 1LL; - auto respBSON = - BSON("ok" << 1 << "cursor" << BSON("id" << cursorId << "ns" - << "local.oplog.rs" - << "nextBatch" - << BSON_ARRAY(lastGetMoreOplogEntry))); - net->scheduleResponse( - noi, - net->now(), - ResponseStatus(RemoteCommandResponse(respBSON, BSONObj(), Milliseconds(10)))); - - log() << "Sending response for getMore network request:"; - log() << " req: " << noi->getRequest().dbname << "." - << noi->getRequest().cmdObj; - log() << " resp:" << respBSON; - - if ((Date_t::now() - lastLog) > Seconds(1)) { - lastLog = Date_t::now(); - log() << "processing oplog getmore, net:" << net->getDiagnosticString(); - net->logQueues(); - } - net->runReadyNetworkOperations(); - continue; - } else if (isOplogKillCursor(noi)) { - auto respBSON = BSON("ok" << 1.0); - log() << "processing oplog killcursors req, net:" << net->getDiagnosticString(); - net->scheduleResponse( - noi, - net->now(), - ResponseStatus(RemoteCommandResponse(respBSON, BSONObj(), Milliseconds(10)))); - net->runReadyNetworkOperations(); - continue; - } + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); - const BSONObj reqBSON = noi->getRequest().cmdObj; - const BSONElement cmdElem = reqBSON.firstElement(); - auto cmdName = cmdElem.fieldNameStringData(); - auto expectedName = _responses[processedRequests].first; - auto response = _responses[processedRequests].second; - ASSERT(_responses[processedRequests].first == "" || - cmdName.equalCaseInsensitive(expectedName)) - << "ERROR: response #" << processedRequests + 1 << ", expected '" << expectedName - << "' command but the request was actually: '" << noi->getRequest().cmdObj - << "' for resp: " << response; - - // process fixed set of responses - log() << "Sending response for network request:"; - log() << " req: " << noi->getRequest().dbname << "." << noi->getRequest().cmdObj; - log() << " resp:" << response; - net->scheduleResponse( - noi, - net->now(), - ResponseStatus(RemoteCommandResponse(response, BSONObj(), Milliseconds(10)))); - - if ((Date_t::now() - lastLog) > Seconds(1)) { - lastLog = Date_t(); - log() << net->getDiagnosticString(); - net->logQueues(); - } - net->runReadyNetworkOperations(); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); - guard.dismiss(); - if (++processedRequests >= expectedResponses) { - log() << "done processing expected requests "; - break; // once we have processed all requests, continue; - } - } + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({}); } - void verifySync(NetworkInterfaceMock* net, Status s = Status::OK()) { - verifySync(net, s.code()); + dr->join(); + ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({BSONObj()}); } - void verifySync(NetworkInterfaceMock* net, ErrorCodes::Error code) { - // Check result - const auto status = _isbr->getResult(net).getStatus(); - ASSERT_EQ(status.code(), code) << "status codes differ, status: " << status; + dr->join(); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({BSON("h" << 1LL)}); } - BSONObj getInitialSyncProgress() { - return _isbr->getInitialSyncProgress(); + dr->join(); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorPassesThroughErrorFromDataReplicatorExternalStateGetCurrentConfig) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + getExternalState()->replSetConfigResult = Status(ErrorCodes::OperationFailed, ""); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); } - // Generate at least one getMore response. - std::size_t numGetMoreOplogEntries = 0; - std::size_t numGetMoreOplogEntriesMax = 1; - BSONObj lastGetMoreOplogEntry; + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} -private: - void tearDown() override; +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherScheduleError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); - Responses _responses; - std::unique_ptr<InitialSyncBackgroundRunner> _isbr{nullptr}; -}; + // Make the tailable oplog query fail. Allow all other requests to be scheduled. + executor::RemoteCommandRequest request; + _shouldFailRequest = [&request](const executor::RemoteCommandRequest& requestToSend) { + if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() && + requestToSend.cmdObj.getBoolField("tailable")) { + request = requestToSend; + return true; + } + return false; + }; + + HostAndPort syncSource("localhost", 12345); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); -void InitialSyncTest::tearDown() { - DataReplicatorTest::tearDownExecutorThread(); - _isbr.reset(); - DataReplicatorTest::tearDown(); + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); + + ASSERT_EQUALS(syncSource, request.target); + ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); } -TEST_F(InitialSyncTest, ShutdownImmediatelyAfterStartup) { - startSync(1); +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherCallbackError) { + auto dr = &getDR(); auto txn = makeOpCtx(); - ASSERT_OK(getDR().shutdown()); - getExecutor().shutdown(); - verifySync(getNet(), ErrorCodes::ShutdownInProgress); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, _options.localOplogNS, {makeOplogEntry(1)})); + net->runReadyNetworkOperations(); + + // Oplog tailing query. + auto request = assertRemoteCommandNameEquals( + "find", net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "dead cursor"))); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->runReadyNetworkOperations(); + + + // OplogFetcher will shut down DatabasesCloner on error after setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _databasesClonerCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); } -TEST_F(InitialSyncTest, Complete) { - /** - * Initial Sync will issue these query/commands - * - replSetGetRBID - * - startTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"] - * - listDatabases (foreach db do below) - * -- cloneDatabase (see DatabaseCloner tests). - * - endTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"] - * - ops = oplog.rs->find({ts:{$gte: startTS}}) (foreach op) - * -- if local doc is missing, getCollection(op.ns).findOne(_id:op.o2._id) - * - if any retries were done in the previous loop, endTS query again for minvalid - * - replSetGetRBID - * - */ +TEST_F(DataReplicatorTest, + DataReplicatorSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply) { + auto dr = &getDR(); + auto txn = makeOpCtx(); - auto lastOpAfterClone = BSON( - "ts" << Timestamp(Seconds(8), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion << "ns" - << "" - << "op" - << "i" - << "o" - << BSON("_id" << 5 << "a" << 2)); - - const Responses responses = { - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, - // listCollections for "a" - {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, - // count:a - {"count", BSON("n" << 1 << "ok" << 1)}, - // listIndexes:a - { - "listIndexes", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" - << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, - // Clone Done - // get latest oplog ts - {"find", BaseClonerTest::createCursorResponse(0, BSON_ARRAY(lastOpAfterClone))}, - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // Applier starts ... + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + auto request = + assertRemoteCommandNameEquals("find", + net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, _options.localOplogNS, {makeOplogEntry(1)}))); + ASSERT_EQUALS(1, request.cmdObj.getIntField("limit")); + net->runReadyNetworkOperations(); + + // Oplog tailing query. + // Simulate cursor closing on sync source. + request = + assertRemoteCommandNameEquals("find", + net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, _options.localOplogNS, {makeOplogEntry(1)}))); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->runReadyNetworkOperations(); + + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // Last rollback checker replSetGetRBID command. + assertRemoteCommandNameEquals( + "replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1))); + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(OplogEntry(makeOplogEntry(1)).getOpTime(), + unittest::assertGet(_lastApplied).opTime); +} + +TEST_F( + DataReplicatorTest, + DataReplicatorSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // Oplog tailing query. + // Simulate cursor closing on sync source. + auto request = assertRemoteCommandNameEquals( + "find", + net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, + _options.localOplogNS, + {makeOplogEntry(1), makeOplogEntry(2, "c"), makeOplogEntry(3, "c")}))); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->runReadyNetworkOperations(); + + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(3)}); + + // Last rollback checker replSetGetRBID command. + assertRemoteCommandNameEquals( + "replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1))); + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(OplogEntry(makeOplogEntry(3)).getOpTime(), + unittest::assertGet(_lastApplied).opTime); +} + +TEST_F( + DataReplicatorTest, + DataReplicatorReturnsRemoteResultsUnavailableOnEarlyOplogFetcherCompletionIfThereAreNotEnoughOperationsInTheOplogBufferToReachEndTimestamp) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // Oplog tailing query. + // Simulate cursor closing on sync source. + auto request = assertRemoteCommandNameEquals( + "find", + net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, + _options.localOplogNS, + {makeOplogEntry(1), makeOplogEntry(2, "c"), makeOplogEntry(3, "c")}))); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->runReadyNetworkOperations(); + + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + // Return an oplog entry with an optime that is more recent than what the completed + // OplogFetcher has read from the sync source. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(4)}); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::RemoteResultsUnavailable, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorPassesThroughDatabasesClonerScheduleErrorAndCancelsOplogFetcher) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + // Make the listDatabases command fail. Allow all other requests to be scheduled. + executor::RemoteCommandRequest request; + _shouldFailRequest = [&request](const executor::RemoteCommandRequest& requestToSend) { + if ("listDatabases" == requestToSend.cmdObj.firstElement().fieldNameStringData()) { + request = requestToSend; + return true; + } + return false; }; - // Initial sync flag should not be set before starting. + HostAndPort syncSource("localhost", 12345); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // DataReplicator shuts down OplogFetcher when it fails to schedule DatabasesCloner + // so we should not expect any network requests in the queue. + ASSERT_FALSE(net->hasReadyRequests()); + + // OplogFetcher is shutting down but we still need to call runReadyNetworkOperations() + // to deliver the cancellation status to the 'DataReplicator::_oplogFetcherCallback' + // callback. + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); + + ASSERT_EQUALS(syncSource, request.target); + ASSERT_EQUALS("admin", request.dbname); + assertRemoteCommandNameEquals("listDatabases", request); +} + +TEST_F(DataReplicatorTest, + DataReplicatorPassesThroughDatabasesClonerCallbackErrorAndCancelsOplogFetcher) { + auto dr = &getDR(); auto txn = makeOpCtx(); - ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get())); - startSync(1); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); - // Play first response to ensure data replicator has entered initial sync state. - setResponses({responses.begin(), responses.begin() + 1}); - numGetMoreOplogEntriesMax = responses.size(); - playResponses(); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); - // Initial sync flag should be set. - ASSERT_TRUE(getStorage().getInitialSyncFlag(txn.get())); + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); - // Play rest of the responses after checking initial sync flag. - setResponses({responses.begin() + 1, responses.end()}); - playResponses(); - log() << "done playing last responses"; + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); - log() << "waiting for initial sync to verify it completed OK"; - verifySync(getNet()); + // Oplog tailing query. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // DatabasesCloner's first remote command - listDatabases + assertRemoteCommandNameEquals( + "listDatabases", + net->scheduleErrorResponse(Status(ErrorCodes::FailedToParse, "listDatabases failed"))); + net->runReadyNetworkOperations(); + + // DatabasesCloner will shut down OplogFetcher on error after setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::FailedToParse, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorIgnoresLocalDatabasesWhenCloningDatabases) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); - log() << "doing asserts"; + auto net = getNet(); { - LockGuard lock(_storageInterfaceWorkDoneMutex); - ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs); - ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled); - ASSERT_EQ(0, _storageInterfaceWorkDone.oplogEntriesInserted); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // Oplog tailing query. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // DatabasesCloner's first remote command - listDatabases + assertRemoteCommandNameEquals( + "listDatabases", + net->scheduleSuccessfulResponse(makeListDatabasesResponse({"a", "local", "b"}))); + net->runReadyNetworkOperations(); + + // DatabasesCloner should only send listCollections requests for databases 'a' and 'b'. + request = assertRemoteCommandNameEquals( + "listCollections", + net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {}))); + ASSERT_EQUALS("a", request.dbname); + + request = assertRemoteCommandNameEquals( + "listCollections", + net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {}))); + ASSERT_EQUALS("b", request.dbname); + + // After processing all the database names and returning empty lists of collections for each + // database, data cloning should run to completion and we should expect to see a last oplog + // entry fetcher request. + request = assertRemoteCommandNameEquals( + "find", + net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {}))); + ASSERT_EQUALS(1, request.cmdObj.getIntField("limit")); } - log() << "checking initial sync flag isn't set."; - // Initial sync flag should not be set after completion. - ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get())); + getExecutor().shutdown(); - // getMore responses are generated by playResponses(). - ASSERT_EQUALS(OplogEntry(lastOpAfterClone).getOpTime(), _myLastOpTime); + dr->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); } -TEST_F(InitialSyncTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) { - const Responses responses = - { - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, - // listCollections for "a" - {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, - // count:a - {"count", BSON("n" << 1 << "ok" << 1)}, - // listIndexes:a - {"listIndexes", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" - << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, - // Clone Done - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'b.c', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, c:1}}]}}")}, - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - }; +TEST_F(DataReplicatorTest, + DataReplicatorIgnoresDatabaseInfoDocumentWithoutNameFieldWhenCloningDatabases) { + auto dr = &getDR(); + auto txn = makeOpCtx(); - // Initial sync flag should not be set before starting. + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // Oplog tailing query. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // DatabasesCloner's first remote command - listDatabases + assertRemoteCommandNameEquals( + "listDatabases", + net->scheduleSuccessfulResponse(BSON("databases" << BSON_ARRAY(BSON("name" + << "a") + << BSON("bad" + << "dbinfo") + << BSON("name" + << "b")) + << "ok" + << 1))); + net->runReadyNetworkOperations(); + + // DatabasesCloner should only send listCollections requests for databases 'a' and 'b'. + request = assertRemoteCommandNameEquals( + "listCollections", + net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {}))); + ASSERT_EQUALS("a", request.dbname); + + request = assertRemoteCommandNameEquals( + "listCollections", + net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {}))); + ASSERT_EQUALS("b", request.dbname); + + // After processing all the database names and returning empty lists of collections for each + // database, data cloning should run to completion and we should expect to see a last oplog + // entry fetcher request. + request = assertRemoteCommandNameEquals( + "find", + net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {}))); + ASSERT_EQUALS(1, request.cmdObj.getIntField("limit")); + } + + getExecutor().shutdown(); + + dr->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorCancelsBothOplogFetcherAndDatabasesClonerOnShutdown) { + auto dr = &getDR(); auto txn = makeOpCtx(); - ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get())); - startSync(1); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); - // Play first response to ensure data replicator has entered initial sync state. - setResponses({responses.begin(), responses.begin() + 1}); - playResponses(); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); - // Initial sync flag should be set. - ASSERT_TRUE(getStorage().getInitialSyncFlag(txn.get())); + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); - // Play rest of the responses after checking initial sync flag. - setResponses({responses.begin() + 1, responses.end()}); - playResponses(); - log() << "done playing last responses"; + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + } - log() << "waiting for initial sync to verify it completed OK"; - verifySync(getNet()); + ASSERT_OK(dr->shutdown()); + executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations(); + + dr->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorPassesThroughSecondLastOplogEntryFetcherScheduleErrorAndCancelsOplogFetcher) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + // Make the second last oplog entry fetcher command fail. Allow all other requests to be + // scheduled. + executor::RemoteCommandRequest request; + bool first = true; + _shouldFailRequest = [&first, &request](const executor::RemoteCommandRequest& requestToSend) { + if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() && + requestToSend.cmdObj.hasField("sort") && + 1 == requestToSend.cmdObj.getIntField("limit")) { + if (first) { + first = false; + return false; + } + request = requestToSend; + return true; + } + return false; + }; - log() << "doing asserts"; + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); { - LockGuard lock(_storageInterfaceWorkDoneMutex); - ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs); - ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled); - ASSERT_EQ(1, _storageInterfaceWorkDone.oplogEntriesInserted); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // DatabasesCloner will shut down the OplogFetcher on failing to schedule the last entry + // oplog fetcher after setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); } - log() << "checking initial sync flag isn't set."; - // Initial sync flag should not be set after completion. - ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get())); + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorPassesThroughSecondLastOplogEntryFetcherCallbackErrorAndCancelsOplogFetcher) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + request = assertRemoteCommandNameEquals( + "find", + net->scheduleErrorResponse( + Status(ErrorCodes::OperationFailed, "second last oplog entry fetcher failed"))); + ASSERT_TRUE(request.cmdObj.hasField("sort")); + ASSERT_EQUALS(1, request.cmdObj.getIntField("limit")); + net->runReadyNetworkOperations(); - ASSERT_EQUALS(OpTime(Timestamp(1, 1), OpTime::kUninitializedTerm), _myLastOpTime); + // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after + // setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); } -TEST_F(InitialSyncTest, Failpoint) { - auto failPoint = getGlobalFailPointRegistry()->getFailPoint("failInitialSyncWithBadHost"); - failPoint->setMode(FailPoint::alwaysOn); - ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); }); +TEST_F(DataReplicatorTest, + DataReplicatorCancelsBothSecondLastOplogEntryFetcherAndOplogFetcherOnShutdown) { + auto dr = &getDR(); + auto txn = makeOpCtx(); - Timestamp time1(100, 1); - OpTime opTime1(time1, OpTime::kInitialTerm); - _myLastOpTime = opTime1; - - startSync(1); - - verifySync(getNet(), ErrorCodes::InvalidSyncSource); -} - -TEST_F(InitialSyncTest, FailsOnClone) { - const Responses responses = { - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", - fromjson( - str::stream() << "{ok:0, errmsg:'fail on clone -- listDBs injected failure', code: " - << int(ErrorCodes::FailedToParse) - << "}")}, - // rollback checker. - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + request = assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + noi = net->getNextReadyRequest(); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.hasField("sort")); + ASSERT_EQUALS(1, request.cmdObj.getIntField("limit")); + net->blackHole(noi); + } + + dr->shutdown(); + executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations(); + + dr->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorCancelsSecondLastOplogEntryFetcherOnOplogFetcherCallbackError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // Save request for OplogFetcher's oplog tailing query. This request will be canceled. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + auto oplogFetcherNetworkOperationIterator = noi; + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + // Blackhole this request which will be canceled when oplog fetcher fails. + noi = net->getNextReadyRequest(); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.hasField("sort")); + ASSERT_EQUALS(1, request.cmdObj.getIntField("limit")); + net->blackHole(noi); + + // Make oplog fetcher fail. + net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator, + Status(ErrorCodes::OperationFailed, "oplog fetcher failed")); + net->runReadyNetworkOperations(); + + // _oplogFetcherCallback() will shut down the '_lastOplogEntryFetcher' after setting the + // completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _lastOplogEntryFetcherCallbackAfterCloningData(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F( + DataReplicatorTest, + DataReplicatorReturnsTypeMismatchErrorWhenSecondLastOplogEntryFetcherReturnsMalformedDocument) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto oplogEntry = makeOplogEntry(1); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({BSON("h" + << "not a hash")}); + + // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after + // setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::TypeMismatch, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsOplogOutOfOrderIfStopTimestampPrecedesBeginTimestamp) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after + // setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied); +} + +TEST_F( + DataReplicatorTest, + DataReplicatorPassesThroughInsertOplogSeedDocumentErrorAfterDataCloningFinishesWithNoOperationsToApply) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + NamespaceString insertDocumentNss; + BSONObj insertDocumentDoc; + _storageInterface->insertDocumentFn = [&insertDocumentDoc, &insertDocumentNss]( + OperationContext*, const NamespaceString& nss, const BSONObj& doc) { + insertDocumentNss = nss; + insertDocumentDoc = doc; + return Status(ErrorCodes::OperationFailed, "failed to insert oplog entry"); }; - startSync(1); - setResponses(responses); - playResponses(); - verifySync(getNet(), ErrorCodes::FailedToParse); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto oplogEntry = makeOplogEntry(1); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after + // setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); + ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss); + ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc); } -TEST_F(InitialSyncTest, FailOnRollback) { - const Responses responses = - { - // get rollback id - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, - // listCollections for "a" - {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, - // count:a - {"count", BSON("n" << 1 << "ok" << 1)}, - // listIndexes:a - {"listIndexes", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" - << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, - // Clone Done - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(2,2), h:NumberLong(1), ns:'b.c', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, c:1}}]}}")}, - // Applier starts ... - // check for rollback - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:2}")}, - }; +TEST_F( + DataReplicatorTest, + DataReplicatorReturnsCallbackCanceledAndDoesNotScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + NamespaceString insertDocumentNss; + BSONObj insertDocumentDoc; + _storageInterface->insertDocumentFn = [dr, &insertDocumentDoc, &insertDocumentNss]( + OperationContext*, const NamespaceString& nss, const BSONObj& doc) { + insertDocumentNss = nss; + insertDocumentDoc = doc; + dr->shutdown(); + return Status::OK(); + }; + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto oplogEntry = makeOplogEntry(1); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after + // setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } - startSync(1); - numGetMoreOplogEntriesMax = responses.size(); - setResponses(responses); - playResponses(); - verifySync(getNet(), ErrorCodes::UnrecoverableRollbackError); + dr->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); + ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss); + ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc); } -TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError) { +TEST_F( + DataReplicatorTest, + DataReplicatorPassesThroughRollbackCheckerScheduleErrorAfterCloningFinishesWithNoOperationsToApply) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + // Make the second replSetGetRBID command fail. Allow all other requests to be scheduled. executor::RemoteCommandRequest request; bool first = true; @@ -981,375 +2075,1229 @@ TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError) return false; }; - const Responses responses = - { - // get rollback id - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, - // listCollections for "a" - {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, - // count:a - {"count", BSON("n" << 1 << "ok" << 1)}, - // listIndexes:a - {"listIndexes", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" - << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, - // Clone Done - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(2,2), h:NumberLong(1), ns:'b.c', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, c:1}}]}}")}, - // Response to replSetGetRBID request is left out so that we can cancel the request by - // rejecting the executor::TaskExecutor::scheduleRemoteCommand() request. - }; + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); - startSync(1); - numGetMoreOplogEntriesMax = responses.size(); - setResponses(responses); - playResponses(); - getExecutor().shutdown(); - verifySync(getNet(), ErrorCodes::OperationFailed); -} - -TEST_F(InitialSyncTest, DataReplicatorPassesThroughOplogFetcherFailure) { - const Responses responses = { - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - }; + auto oplogEntry = makeOplogEntry(1); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); - startSync(1); + // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after + // setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } - setResponses(responses); - playResponses(); - log() << "done playing responses - oplog fetcher is active"; + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} +TEST_F( + DataReplicatorTest, + DataReplicatorPassesThroughRollbackCheckerCallbackErrorAfterCloningFinishesWithNoOperationsToApply) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto oplogEntry = makeOplogEntry(1); + auto net = getNet(); { - auto net = getNet(); executor::NetworkInterfaceMock::InNetworkGuard guard(net); - ASSERT_TRUE(net->hasReadyRequests()); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - // Blackhole requests until we see a getMore. - while (!isOplogGetMore(noi)) { - log() << "Blackholing non-getMore request: " << noi->getRequest(); - net->blackHole(noi); - ASSERT_TRUE(net->hasReadyRequests()); - noi = net->getNextReadyRequest(); - } - log() << "Sending error response to getMore"; - net->scheduleErrorResponse(noi, {ErrorCodes::OperationFailed, "dead cursor"}); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // Last rollback checker replSetGetRBID command. + assertRemoteCommandNameEquals( + "replSetGetRBID", + net->scheduleErrorResponse( + Status(ErrorCodes::OperationFailed, "replSetGetRBID command failed"))); + net->runReadyNetworkOperations(); + + // _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting + // the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). net->runReadyNetworkOperations(); } - verifySync(getNet(), ErrorCodes::OperationFailed); + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); } -TEST_F(InitialSyncTest, OplogOutOfOrderOnOplogFetchFinish) { - const Responses responses = - { - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, - // listCollections for "a" - {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, - // count:a - {"count", BSON("n" << 1 << "ok" << 1)}, - // listIndexes:a - {"listIndexes", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" - << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - first batch - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, - // getMore:a - second batch - {"getMore", - fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:[" - "{_id:2, a:2} " - "]}}")}, - // getMore:a - third batch - {"getMore", - fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:[" - "{_id:3, a:3} " - "]}}")}, - // getMore:a - last batch - {"getMore", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', nextBatch:[" - "{_id:4, a:4} " - "]}}")}, - // Clone Done - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(7,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:5, a:2}}]}}")}, - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // Applier starts ... - }; +TEST_F(DataReplicatorTest, DataReplicatorCancelsLastRollbackCheckerOnShutdown) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto oplogEntry = makeOplogEntry(1); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); - startSync(1); + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // Last rollback checker replSetGetRBID command. + noi = net->getNextReadyRequest(); + assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest()); + net->blackHole(noi); + + // _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting + // the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } - numGetMoreOplogEntriesMax = responses.size(); - setResponses({responses.begin(), responses.end() - 4}); - playResponses(); - log() << "done playing first responses"; + ASSERT_OK(dr->shutdown()); + executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations(); - // This variable is used for the reponse timestamps. Setting it to 0 will make the oplog - // entries come out of order. - numGetMoreOplogEntries = 0; - setResponses({responses.end() - 4, responses.end()}); - playResponses(); - log() << "done playing second responses"; - verifySync(getNet(), ErrorCodes::OplogOutOfOrder); + dr->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); } -TEST_F(InitialSyncTest, InitialSyncStateIsResetAfterFailure) { - const Responses responses = - { - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, - // listCollections for "a" - {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, - // count:a - {"count", BSON("n" << 1 << "ok" << 1)}, - // listIndexes:a - {"listIndexes", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" - << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, - // Clone Done - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(7,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:5, a:2}}]}}")}, - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:2}")}, - // Applier starts ... +TEST_F(DataReplicatorTest, DataReplicatorCancelsLastRollbackCheckerOnOplogFetcherCallbackError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto oplogEntry = makeOplogEntry(1); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // Save request for OplogFetcher's oplog tailing query. This request will be canceled. + auto noi = net->getNextReadyRequest(); + auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + auto oplogFetcherNetworkOperationIterator = noi; + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // Last rollback checker replSetGetRBID command. + noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("replSetGetRBID", request); + net->blackHole(noi); + + // Make oplog fetcher fail. + net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator, + Status(ErrorCodes::OperationFailed, "oplog fetcher failed")); + net->runReadyNetworkOperations(); + + // _oplogFetcherCallback() will shut down the last rollback checker after setting the + // completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _rollbackCheckerCheckForRollbackCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsUnrecoverableRollbackErrorIfSyncSourceRolledBackAfterCloningData) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto oplogEntry = makeOplogEntry(1); + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // Last rollback checker replSetGetRBID command. + request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId + 1)); + net->runReadyNetworkOperations(); + assertRemoteCommandNameEquals("replSetGetRBID", request); + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, _lastApplied); +} + +TEST_F(DataReplicatorTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get())); + + auto oplogEntry = makeOplogEntry(1); + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Instead of fast forwarding to DatabasesCloner completion by returning an empty list of + // database names, we'll simulate copying a single database with a single collection on the + // sync source. + NamespaceString nss("a.a"); + request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); + assertRemoteCommandNameEquals("listDatabases", request); + net->runReadyNetworkOperations(); + + // listCollections for "a" + request = net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())})); + assertRemoteCommandNameEquals("listCollections", request); + + // count:a + request = assertRemoteCommandNameEquals( + "count", net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1))); + ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(nss.db(), request.dbname); + + // listIndexes:a + request = assertRemoteCommandNameEquals( + "listIndexes", + net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, + NamespaceString(nss.getCommandNS()), + {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name" + << "_id_" + << "ns" + << nss.ns())}))); + ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(nss.db(), request.dbname); + + // find:a + request = assertRemoteCommandNameEquals("find", + net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, nss, {BSON("_id" << 1 << "a" << 1)}))); + ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(nss.db(), request.dbname); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry}); + + // Last rollback checker replSetGetRBID command. + request = assertRemoteCommandNameEquals( + "replSetGetRBID", + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId))); + net->runReadyNetworkOperations(); + + // Deliver cancellation to OplogFetcher. + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(OplogEntry(oplogEntry).getOpTime(), unittest::assertGet(_lastApplied).opTime); + ASSERT_EQUALS(oplogEntry["h"].Long(), unittest::assertGet(_lastApplied).value); + ASSERT_FALSE(_storageInterface->getInitialSyncFlag(txn.get())); +} + +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetNextApplierBatchScheduleError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get())); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Before processing scheduled last oplog entry fetcher response, set flag in + // TaskExecutorMock so that DataReplicator will fail to schedule + // _getNextApplierBatchCallback(). + _executorProxy->shouldFailScheduleWork = true; + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after + // setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughSecondGetNextApplierBatchScheduleError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get())); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Before processing scheduled last oplog entry fetcher response, set flag in + // TaskExecutorMock so that DataReplicator will fail to schedule second + // _getNextApplierBatchCallback() at (now + options.getApplierBatchCallbackRetryWait). + _executorProxy->shouldFailScheduleWorkAt = true; + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after + // setting the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorCancelsGetNextApplierBatchOnShutdown) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get())); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move + // on to the DatabasesCloner's request. + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); + net->blackHole(noi); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // Since we black holed OplogFetcher's find request, _getNextApplierBatch_inlock() will + // not return any operations for us to apply, leading to _getNextApplierBatchCallback() + // rescheduling itself at new->now() + _options.getApplierBatchCallbackRetryWait. + } + + ASSERT_OK(dr->shutdown()); + executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations(); + + dr->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetNextApplierBatchInLockError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get())); + + // _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected + // version (not OplogEntry::kOplogVersion). + auto oplogEntry = makeOplogEntry(1); + auto oplogEntryWithInconsistentVersion = + makeOplogEntry(2, "i", OplogEntry::kOplogVersion + 100); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the + // oplog buffer and processed by _getNextApplierBatch_inlock(). + auto request = assertRemoteCommandNameEquals( + "find", + net->scheduleSuccessfulResponse(makeCursorResponse( + 1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion}))); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->runReadyNetworkOperations(); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // OplogFetcher's getMore request. Black hole because we already got our bad oplog entry + // into the oplog buffer. + auto noi = net->getNextReadyRequest(); + assertRemoteCommandNameEquals("getMore", noi->getRequest()); + net->blackHole(noi); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the + // completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::BadValue, _lastApplied); +} + +TEST_F( + DataReplicatorTest, + DataReplicatorReturnsEmptyBatchFromGetNextApplierBatchInLockIfRsSyncApplyStopFailPointIsEnabled) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get())); + + // _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected + // version (not OplogEntry::kOplogVersion). + auto oplogEntry = makeOplogEntry(1); + auto oplogEntryWithInconsistentVersion = + makeOplogEntry(2, "i", OplogEntry::kOplogVersion + 100); + + // Enable 'rsSyncApplyStop' so that _getNextApplierBatch_inlock() returns an empty batch of + // operations instead of a batch containing an oplog entry with a bad version. + auto failPoint = getGlobalFailPointRegistry()->getFailPoint("rsSyncApplyStop"); + failPoint->setMode(FailPoint::alwaysOn); + ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); }); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the + // oplog buffer and processed by _getNextApplierBatch_inlock(). + auto request = net->scheduleSuccessfulResponse(makeCursorResponse( + 1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion})); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->runReadyNetworkOperations(); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // OplogFetcher's getMore request. Black hole because we already got our bad oplog entry + // into the oplog buffer. + auto noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("getMore", request); + net->blackHole(noi); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // Since the 'rsSyncApplyStop' fail point is enabled, DataReplicator will get an empty + // batch of operations from _getNextApplierBatch_inlock() even though the oplog buffer + // is not empty. + } + + // If the fail point is not working, the initial sync status will be set to BadValue (due to the + // bad oplog entry in the oplog buffer) and shutdown() will not be able to overwrite this status + // with CallbackCanceled. + // Otherwise, shutdown() will cancel both the OplogFetcher and the scheduled + // _getNextApplierBatchCallback() task. The final initial sync status will be CallbackCanceled. + ASSERT_OK(dr->shutdown()); + executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations(); + + dr->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsNoSuchKeyIfApplierBatchContainsAnOplogEntryWithoutHash) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get())); + + // This oplog entry (without a required "h" field) will be read by OplogFetcher and inserted + // into OplogBuffer to be retrieved by _getNextApplierBatch_inlock(). + auto oplogEntryWithoutHash = BSON("ts" << Timestamp(2, 2) << "v" << OplogEntry::kOplogVersion); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // OplogFetcher's oplog tailing query. Save for later. + auto request = net->scheduleSuccessfulResponse(makeCursorResponse( + 1LL, _options.localOplogNS, {makeOplogEntry(1), oplogEntryWithoutHash})); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->runReadyNetworkOperations(); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Ignore OplogFetcher's getMore request. + auto noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("getMore", request); + net->blackHole(noi); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the + // completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierScheduleError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get())); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // OplogFetcher's oplog tailing query. Save for later. + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + auto oplogFetcherNoi = noi; + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // _getNextApplierBatchCallback() should have rescheduled itself. + // We'll insert some operations in the oplog buffer so that we'll attempt to schedule + // MultiApplier next time _getNextApplierBatchCallback() runs. + net->scheduleSuccessfulResponse( + oplogFetcherNoi, + executor::RemoteCommandResponse( + makeCursorResponse( + 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)}), + BSONObj(), + Milliseconds(0))); + net->runReadyNetworkOperations(); + + // Ignore OplogFetcher's getMore request. + noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("getMore", request); + + // Make MultiApplier::startup() fail. + _executorProxy->shouldFailScheduleWork = true; + + // Advance clock until _getNextApplierBatchCallback() runs. + auto when = net->now() + _options.getApplierBatchCallbackRetryWait; + ASSERT_EQUALS(when, net->runUntil(when)); + + // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the + // completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierCallbackError) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + getExternalState()->multiApplyFn = + [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn) { + return Status(ErrorCodes::OperationFailed, "multiApply failed"); }; + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // OplogFetcher's oplog tailing query. Provide enough operations to trigger MultiApplier. + auto request = net->scheduleSuccessfulResponse( + makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)})); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->runReadyNetworkOperations(); - startSync(2); + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); - numGetMoreOplogEntriesMax = responses.size(); - setResponses(responses); - playResponses(); - log() << "done playing first responses"; + // Ignore OplogFetcher's getMore request. + auto noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("getMore", request); - // Play first response again to ensure data replicator has entered initial sync state. - setResponses({responses.begin(), responses.begin() + 1}); - playResponses(); - log() << "done playing first response of second round of responses"; + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + // _multiApplierCallback() will shut down the OplogFetcher after setting the completion + // status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, DataReplicatorCancelsGetNextApplierBatchCallbackOnOplogFetcherError) { auto dr = &getDR(); - ASSERT_TRUE(dr->getState() == DataReplicatorState::InitialSync) << ", state: " - << dr->getDiagnosticString(); - ASSERT_EQUALS(dr->getLastFetched(), OpTimeWithHash()); - ASSERT_EQUALS(dr->getLastApplied(), OpTimeWithHash()); - - setResponses({responses.begin() + 1, responses.end()}); - playResponses(); - log() << "done playing second round of responses"; - verifySync(getNet(), ErrorCodes::UnrecoverableRollbackError); -} - -TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) { - const Responses failedResponses = { - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", - fromjson("{ok:0, errmsg:'fail on clone -- listDBs injected failure', code:9}")}, + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // OplogFetcher's oplog tailing query. Save for later. + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + auto oplogFetcherNoi = noi; + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // Send error to _oplogFetcherCallback(). + net->scheduleErrorResponse(oplogFetcherNoi, + Status(ErrorCodes::OperationFailed, "oplog fetcher failed")); + + // _oplogFetcherCallback() will cancel the _getNextApplierBatchCallback() task after setting + // the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto lastOp = makeOplogEntry(2); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // OplogFetcher's oplog tailing query. Response has enough operations to reach + // end timestamp. + auto request = net->scheduleSuccessfulResponse( + makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1), lastOp})); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->runReadyNetworkOperations(); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Black hole OplogFetcher's getMore request. + auto noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("getMore", request); + net->blackHole(noi); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({lastOp}); + + // Last rollback ID. + request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + assertRemoteCommandNameEquals("replSetGetRBID", request); + net->runReadyNetworkOperations(); + + // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting + // the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime); + ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value); +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + // To make DataReplicator apply multiple batches, we make the third and last operation a command + // so that it will go into a separate batch from the second operation. First operation is the + // last fetched entry before data cloning and is not applied. + auto lastOp = makeOplogEntry(3, "c"); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // OplogFetcher's oplog tailing query. Response has enough operations to reach + // end timestamp. + auto request = net->scheduleSuccessfulResponse(makeCursorResponse( + 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2), lastOp})); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->runReadyNetworkOperations(); + + // Instead of fast forwarding to DatabasesCloner completion by returning an empty list of + // database names, we'll simulate copying a single database with a single collection on the + // sync source. + NamespaceString nss("a.a"); + request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); + assertRemoteCommandNameEquals("listDatabases", request); + net->runReadyNetworkOperations(); + + // Black hole OplogFetcher's getMore request. + auto noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("getMore", request); + net->blackHole(noi); + + // listCollections for "a" + request = net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())})); + assertRemoteCommandNameEquals("listCollections", request); + + // count:a + request = net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1)); + assertRemoteCommandNameEquals("count", request); + ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(nss.db(), request.dbname); + + // listIndexes:a + request = net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, + NamespaceString(nss.getCommandNS()), + {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name" + << "_id_" + << "ns" + << nss.ns())})); + assertRemoteCommandNameEquals("listIndexes", request); + ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(nss.db(), request.dbname); + + // find:a + request = net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, nss, {BSON("_id" << 1 << "a" << 1)})); + assertRemoteCommandNameEquals("find", request); + ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(nss.db(), request.dbname); + + // Second last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({lastOp}); + + // Last rollback ID. + request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + assertRemoteCommandNameEquals("replSetGetRBID", request); + net->runReadyNetworkOperations(); + + // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting + // the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime); + ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value); +} + +TEST_F( + DataReplicatorTest, + DataReplicatorSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + // Override DataReplicatorExternalState::_multiInitialSyncApply() so that it will also fetch a + // missing document. + // This forces DataReplicator to evaluate its end timestamp for applying operations after each + // batch. + getExternalState()->multiApplyFn = [](OperationContext*, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) { + // 'OperationPtr*' is ignored by our overridden _multiInitialSyncApply(). + applyOperation(nullptr); + return ops.back().getOpTime(); + }; + bool fetchCountIncremented = false; + getExternalState()->multiInitialSyncApplyFn = [&fetchCountIncremented]( + MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32* fetchCount) { + if (!fetchCountIncremented) { + fetchCount->addAndFetch(1); + fetchCountIncremented = true; + } + return Status::OK(); }; - const Responses successfulResponses = - { - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, - // listCollections for "a" - {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, - // count:a - {"count", BSON("n" << 5 << "ok" << 1)}, - // listIndexes:a - {"listIndexes", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" - << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - first batch - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, - // getMore:a - second batch - {"getMore", - fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:[" - "{_id:2, a:2} " - "]}}")}, - // getMore:a - third batch - {"getMore", - fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:[" - "{_id:3, a:3} " - "]}}")}, - // getMore:a - fourth batch - {"getMore", - fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:[" - "{_id:3, a:3} " - "]}}")}, - // getMore:a - last batch - {"getMore", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', nextBatch:[" - "{_id:4, a:4} " - "]}}")}, - // Clone Done - // get latest oplog ts - // This is a testing-only side effect of using playResponses. We may end up generating - // getMore responses past this timestamp 7. - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(7,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:5, a:2}}]}}")}, - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // Applier starts ... - }; + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + // Use command for third and last operation to ensure we have two batches to apply. + auto lastOp = makeOplogEntry(3, "c"); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + + // OplogFetcher's oplog tailing query. Response has enough operations to reach + // end timestamp. + auto request = net->scheduleSuccessfulResponse(makeCursorResponse( + 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2), lastOp})); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->runReadyNetworkOperations(); + + // Quickest path to a successful DatabasesCloner completion is to respond to the + // listDatabases with an empty list of database names. + assertRemoteCommandNameEquals( + "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); + net->runReadyNetworkOperations(); + + // Black hole OplogFetcher's getMore request. + auto noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("getMore", request); + net->blackHole(noi); + + // Second last oplog entry fetcher. + // Send oplog entry with timestamp 2. DataReplicator will update this end timestamp after + // applying the first batch. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)}); + + // Third last oplog entry fetcher. + processSuccessfulLastOplogEntryFetcherResponse({lastOp}); + + // Last rollback ID. + request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + assertRemoteCommandNameEquals("replSetGetRBID", request); + net->runReadyNetworkOperations(); + + // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting + // the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime); + ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value); + + ASSERT_TRUE(fetchCountIncremented); + + auto progress = dr->getInitialSyncProgress(); + log() << "Progress after failed initial sync attempt: " << progress; + ASSERT_EQUALS(1, progress.getIntField("fetchedMissingDocs")) << progress; +} + +TEST_F(DataReplicatorTest, + DataReplicatorReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + // This fail point makes chooseSyncSourceCallback fail with an InvalidSyncSource error. + auto failPoint = getGlobalFailPointRegistry()->getFailPoint("failInitialSyncWithBadHost"); + failPoint->setMode(FailPoint::alwaysOn); + ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); }); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + dr->join(); + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, _lastApplied); +} + +TEST_F(DataReplicatorTest, OplogOutOfOrderOnOplogFetchFinish) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(dr->startup(txn.get(), maxAttempts)); + + auto net = getNet(); + int baseRollbackId = 1; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); - startSync(2); + // OplogFetcher's oplog tailing query. + auto request = net->scheduleSuccessfulResponse( + makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1)})); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->runReadyNetworkOperations(); + + // Ignore listDatabases request. + auto noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("listDatabases", request); + net->blackHole(noi); + + // Ensure that OplogFetcher fails with an OplogOutOfOrder error by responding to the getMore + // request with oplog entries containing the following timestamps (most recently processed + // oplog entry has a timestamp of 1): + // (last=1), 5, 4 + request = net->scheduleSuccessfulResponse(makeCursorResponse( + 1LL, _options.localOplogNS, {makeOplogEntry(5), makeOplogEntry(4)}, false)); + assertRemoteCommandNameEquals("getMore", request); + net->runReadyNetworkOperations(); + + // Deliver cancellation signal to DatabasesCloner. + net->runReadyNetworkOperations(); + } + + dr->join(); + ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied); +} + +TEST_F(DataReplicatorTest, GetInitialSyncProgressReturnsCorrectProgress) { + auto dr = &getDR(); + auto txn = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017)); + ASSERT_OK(dr->startup(txn.get(), 2U)); + + auto net = getNet(); + int baseRollbackId = 1; // Play first 2 responses to ensure data replicator has started the oplog fetcher. - setResponses({failedResponses.begin(), failedResponses.begin() + 3}); - numGetMoreOplogEntriesMax = failedResponses.size() + successfulResponses.size(); - playResponses(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + } + log() << "Done playing first failed response"; - auto progress = getInitialSyncProgress(); + auto progress = dr->getInitialSyncProgress(); log() << "Progress after first failed response: " << progress; ASSERT_EQUALS(progress.nFields(), 8) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress; @@ -1362,18 +3310,47 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) { ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0)); // Play rest of the failed round of responses. - setResponses({failedResponses.begin() + 3, failedResponses.end()}); - playResponses(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Ignore oplog tailing query. + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->blackHole(noi); + + request = net->scheduleErrorResponse( + Status(ErrorCodes::FailedToParse, "fail on clone -- listDBs injected failure")); + assertRemoteCommandNameEquals("listDatabases", request); + net->runReadyNetworkOperations(); + + // Deliver cancellation to OplogFetcher + net->runReadyNetworkOperations(); + } + log() << "Done playing failed responses"; - // Play the first 3 responses of the successful round of responses to ensure that the + // Play the first 2 responses of the successful round of responses to ensure that the // data replicator starts the oplog fetcher. - setResponses({successfulResponses.begin(), successfulResponses.begin() + 3}); - numGetMoreOplogEntries = 0; - playResponses(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + auto when = net->now() + _options.initialSyncRetryWait; + ASSERT_EQUALS(when, net->runUntil(when)); + + // Base rollback ID. + auto request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + assertRemoteCommandNameEquals("replSetGetRBID", request); + net->runReadyNetworkOperations(); + + // Last oplog entry. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)}); + } + log() << "Done playing first successful response"; - progress = getInitialSyncProgress(); + progress = dr->getInitialSyncProgress(); log() << "Progress after failure: " << progress; ASSERT_EQUALS(progress.nFields(), 8) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress; @@ -1388,21 +3365,86 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) { ASSERT_EQUALS(attempts.nFields(), 1) << attempts; BSONObj attempt0 = attempts["0"].Obj(); ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0; - ASSERT_EQUALS(attempt0.getStringField("status"), - std::string("FailedToParse: fail on clone -- listDBs injected failure")) + ASSERT_EQUALS( + attempt0.getStringField("status"), + std::string( + "FailedToParse: error cloning databases: fail on clone -- listDBs injected failure")) << attempt0; ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0; ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017")) << attempt0; // Play all but last of the successful round of responses. - setResponses({successfulResponses.begin() + 3, successfulResponses.end() - 1}); - // Reset getMore counter because the data replicator starts a new oplog tailing query. - numGetMoreOplogEntries = 0; - playResponses(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Ignore oplog tailing query. + auto request = net->scheduleSuccessfulResponse(makeCursorResponse(1LL, + _options.localOplogNS, + {makeOplogEntry(1), + makeOplogEntry(2), + makeOplogEntry(3), + makeOplogEntry(4), + makeOplogEntry(5), + makeOplogEntry(6), + makeOplogEntry(7) + + })); + assertRemoteCommandNameEquals("find", request); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + net->runReadyNetworkOperations(); + + // listDatabases + NamespaceString nss("a.a"); + request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); + assertRemoteCommandNameEquals("listDatabases", request); + net->runReadyNetworkOperations(); + + auto noi = net->getNextReadyRequest(); + request = noi->getRequest(); + assertRemoteCommandNameEquals("getMore", request); + net->blackHole(noi); + + // listCollections for "a" + request = net->scheduleSuccessfulResponse( + makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())})); + assertRemoteCommandNameEquals("listCollections", request); + + // count:a + request = net->scheduleSuccessfulResponse(BSON("n" << 5 << "ok" << 1)); + assertRemoteCommandNameEquals("count", request); + ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(nss.db(), request.dbname); + + // listIndexes:a + request = net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, + NamespaceString(nss.getCommandNS()), + {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name" + << "_id_" + << "ns" + << nss.ns())})); + assertRemoteCommandNameEquals("listIndexes", request); + ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(nss.db(), request.dbname); + + // find:a - 5 batches + for (int i = 1; i <= 5; ++i) { + request = net->scheduleSuccessfulResponse( + makeCursorResponse(i < 5 ? 2LL : 0LL, nss, {BSON("_id" << i << "a" << i)}, i == 1)); + ASSERT_EQUALS(i == 1 ? "find" : "getMore", + request.cmdObj.firstElement().fieldNameStringData()); + net->runReadyNetworkOperations(); + } + + // Second last oplog entry fetcher. + // Send oplog entry with timestamp 2. DataReplicator will update this end timestamp after + // applying the first batch. + processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(7)}); + } log() << "Done playing all but last successful response"; - progress = getInitialSyncProgress(); + progress = dr->getInitialSyncProgress(); log() << "Progress after all but last successful response: " << progress; ASSERT_EQUALS(progress.nFields(), 9) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress; @@ -1432,23 +3474,40 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) { ASSERT_EQUALS(attempts.nFields(), 1) << progress; attempt0 = attempts["0"].Obj(); ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0; - ASSERT_EQUALS(attempt0.getStringField("status"), - std::string("FailedToParse: fail on clone -- listDBs injected failure")) + ASSERT_EQUALS( + attempt0.getStringField("status"), + std::string( + "FailedToParse: error cloning databases: fail on clone -- listDBs injected failure")) << attempt0; ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0; ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017")) << attempt0; // Play last successful response. - setResponses({successfulResponses.end() - 1, successfulResponses.end()}); - playResponses(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Last rollback ID. + assertRemoteCommandNameEquals( + "replSetGetRBID", + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId))); + net->runReadyNetworkOperations(); + + // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting + // the completion status. + // We call runReadyNetworkOperations() again to deliver the cancellation status to + // _oplogFetcherCallback(). + net->runReadyNetworkOperations(); + } log() << "waiting for initial sync to verify it completed OK"; - verifySync(getNet()); + dr->join(); + ASSERT_EQUALS(OplogEntry(makeOplogEntry(7)).getOpTime(), + unittest::assertGet(_lastApplied).opTime); - progress = getInitialSyncProgress(); + progress = dr->getInitialSyncProgress(); log() << "Progress at end: " << progress; - ASSERT_EQUALS(progress.nFields(), 10) << progress; + ASSERT_EQUALS(progress.nFields(), 11) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress; ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress; ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress; @@ -1465,8 +3524,10 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) { attempt0 = attempts["0"].Obj(); ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0; - ASSERT_EQUALS(attempt0.getStringField("status"), - std::string("FailedToParse: fail on clone -- listDBs injected failure")) + ASSERT_EQUALS( + attempt0.getStringField("status"), + std::string( + "FailedToParse: error cloning databases: fail on clone -- listDBs injected failure")) << attempt0; ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0; ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017")) @@ -1480,72 +3541,4 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) { << attempt1; } -TEST_F(InitialSyncTest, DataReplicatorCreatesNewApplierForNextBatchBeforeDestroyingCurrentApplier) { - auto getRollbackIdResponse = BSON("ok" << 1 << "rbid" << 1); - auto noopOp1 = BSON("ts" << Timestamp(Seconds(1), 1U) << "h" << 1LL << "v" - << OplogEntry::kOplogVersion - << "ns" - << "" - << "op" - << "n" - << "o" - << BSON("msg" - << "noop")); - auto createCollectionOp1 = - BSON("ts" << Timestamp(Seconds(2), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion - << "ns" - << "test.$cmd" - << "op" - << "c" - << "o" - << BSON("create" - << "coll1")); - auto createCollectionOp2 = - BSON("ts" << Timestamp(Seconds(3), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion - << "ns" - << "test.$cmd" - << "op" - << "c" - << "o" - << BSON("create" - << "coll2")); - const Responses responses = { - // pre-initial sync rollback checker request - {"replSetGetRBID", getRollbackIdResponse}, - // get latest oplog ts - this should match the first op returned by the oplog fetcher - {"find", - BSON("ok" << 1 << "cursor" << BSON("id" << 0LL << "ns" - << "local.oplog.rs" - << "firstBatch" - << BSON_ARRAY(noopOp1)))}, - // oplog fetcher find - single set of results containing two commands that have to be - // applied in separate batches per batching logic - {"find", - BSON("ok" << 1 << "cursor" << BSON("id" << 0LL << "ns" - << "local.oplog.rs" - << "firstBatch" - << BSON_ARRAY(noopOp1 << createCollectionOp1 - << createCollectionOp2)))}, - // Clone Start - // listDatabases - return empty list of databases since we're not testing the cloner. - {"listDatabases", BSON("ok" << 1 << "databases" << BSONArray())}, - // get latest oplog ts - this should match the last op returned by the oplog fetcher - {"find", - BSON("ok" << 1 << "cursor" << BSON("id" << 0LL << "ns" - << "local.oplog.rs" - << "firstBatch" - << BSON_ARRAY(createCollectionOp2)))}, - // post-initial sync rollback checker request - {"replSetGetRBID", getRollbackIdResponse}, - }; - - startSync(1); - - setResponses(responses); - playResponses(); - log() << "Done playing responses"; - verifySync(getNet()); - ASSERT_EQUALS(OplogEntry(createCollectionOp2).getOpTime(), _myLastOpTime); -} - } // namespace diff --git a/src/mongo/db/repl/initial_sync_state.h b/src/mongo/db/repl/initial_sync_state.h index cb607824171..d66b762e2da 100644 --- a/src/mongo/db/repl/initial_sync_state.h +++ b/src/mongo/db/repl/initial_sync_state.h @@ -48,16 +48,12 @@ namespace repl { * Holder of state for initial sync (DataReplicator). */ struct InitialSyncState { - InitialSyncState(std::unique_ptr<DatabasesCloner> cloner, Event finishEvent) - : dbsCloner(std::move(cloner)), finishEvent(finishEvent), status(Status::OK()){}; + InitialSyncState(std::unique_ptr<DatabasesCloner> cloner) : dbsCloner(std::move(cloner)){}; std::unique_ptr<DatabasesCloner> dbsCloner; // Cloner for all databases included in initial sync. - BSONObj oplogSeedDoc; // Document to seed the oplog with when initial sync is done. Timestamp beginTimestamp; // Timestamp from the latest entry in oplog when started. Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states. - Event finishEvent; // event fired on completion, either successful or not. - Status status; // final status, only valid after the finishEvent fires. Timer timer; // Timer for timing how long each initial sync attempt takes. size_t fetchedMissingDocs = 0; size_t appliedOps = 0; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 699bb2636ac..f51c5ece84b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -37,6 +37,7 @@ #include "mongo/base/status.h" #include "mongo/client/fetcher.h" +#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/global_timestamp.h" @@ -560,7 +561,7 @@ void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) { LockGuard lk(_mutex); _dr.swap(drCopy); } - if (drCopy && drCopy->getState() == DataReplicatorState::InitialSync) { + if (drCopy) { LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling DataReplicator::shutdown."; const auto status = drCopy->shutdown(); @@ -591,42 +592,32 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn, // Do initial sync. if (_externalState->shouldUseDataReplicatorInitialSync()) { - _externalState->runOnInitialSyncThread([this, startCompleted](OperationContext* txn) { - std::shared_ptr<DataReplicator> drCopy; - UniqueLock lk(_mutex); // Must take the lock to set _dr, but not call it. - drCopy = std::make_shared<DataReplicator>( - createDataReplicatorOptions(this, _externalState.get()), - stdx::make_unique<DataReplicatorExternalStateImpl>(this, _externalState.get()), - _storage); - _dr = drCopy; - lk.unlock(); - - const auto status = drCopy->doInitialSync(txn, numInitialSyncAttempts); - // If it is interrupted by resync, we do not need to cleanup the DataReplicator. - if (status == ErrorCodes::ShutdownInProgress) { - return; - } - - drCopy.reset(); - lk.lock(); + if (!_externalState->getTaskExecutor()) { + log() << "not running initial sync during test."; + return; + } - if (status == ErrorCodes::CallbackCanceled) { - log() << "Initial Sync has been cancelled: " << status.getStatus(); - return; - } else if (!status.isOK()) { - if (_inShutdown) { - log() << "Initial Sync failed during shutdown due to " << status.getStatus(); + auto onCompletion = [this, startCompleted](const StatusWith<OpTimeWithHash>& status) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (status == ErrorCodes::CallbackCanceled) { + log() << "Initial Sync has been cancelled: " << status.getStatus(); return; - } else { - error() << "Initial sync failed, shutting down now. Restart the server to " - "attempt a new initial sync."; - fassertFailedWithStatusNoTrace(40088, status.getStatus()); + } else if (!status.isOK()) { + if (_inShutdown) { + log() << "Initial Sync failed during shutdown due to " + << status.getStatus(); + return; + } else { + error() << "Initial sync failed, shutting down now. Restart the server " + "to attempt a new initial sync."; + fassertFailedWithStatusNoTrace(40088, status.getStatus()); + } } - } - const auto lastApplied = status.getValue(); - _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false); - lk.unlock(); + const auto lastApplied = status.getValue(); + _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false); + } // Clear maint. mode. while (getMaintenanceMode()) { @@ -637,9 +628,35 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn, startCompleted(); } // Repair local db (to compact it). - uassertStatusOK(_externalState->runRepairOnLocalDB(txn)); - _externalState->startSteadyStateReplication(txn, this); - }); + auto txn = cc().makeOperationContext(); + uassertStatusOK(_externalState->runRepairOnLocalDB(txn.get())); + _externalState->startSteadyStateReplication(txn.get(), this); + }; + + std::shared_ptr<DataReplicator> drCopy; + try { + { + // Must take the lock to set _dr, but not call it. + stdx::lock_guard<stdx::mutex> lock(_mutex); + drCopy = std::make_shared<DataReplicator>( + createDataReplicatorOptions(this, _externalState.get()), + stdx::make_unique<DataReplicatorExternalStateImpl>(this, _externalState.get()), + _storage, + onCompletion); + _dr = drCopy; + } + // DataReplicator::startup() must be called outside lock because it uses features (eg. + // setting the initial sync flag) which depend on the ReplicationCoordinatorImpl. + uassertStatusOK(drCopy->startup(txn, numInitialSyncAttempts)); + } catch (...) { + auto status = exceptionToStatus(); + log() << "Initial Sync failed to start: " << status; + if (ErrorCodes::CallbackCanceled == status || + ErrorCodes::isShutdownError(status.code())) { + return; + } + fassertFailedWithStatusNoTrace(40354, status); + } } else { _externalState->startInitialSync([this, startCompleted](OperationContext* txn) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -736,6 +753,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { if (!status.isOK()) { warning() << "DataReplicator shutdown failed: " << status; } + drCopy->join(); drCopy.reset(); } _externalState->shutdown(txn); @@ -2174,7 +2192,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* txn, auto opTime = _getMyLastAppliedOpTime_inlock(); _topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result); // If we are in the middle of an initial sync, do a resync. - doResync = result.isOK() && _dr && _dr->getState() == DataReplicatorState::InitialSync; + doResync = result.isOK() && _dr && _dr->isActive(); } if (doResync) { diff --git a/src/mongo/executor/task_executor_test_fixture.cpp b/src/mongo/executor/task_executor_test_fixture.cpp index acbb21cbeed..bc99e1538fa 100644 --- a/src/mongo/executor/task_executor_test_fixture.cpp +++ b/src/mongo/executor/task_executor_test_fixture.cpp @@ -43,8 +43,8 @@ Status TaskExecutorTest::getDetectableErrorStatus() { return Status(ErrorCodes::InternalError, "Not mutated"); } -void TaskExecutorTest::assertRemoteCommandNameEquals(StringData cmdName, - const RemoteCommandRequest& request) { +RemoteCommandRequest TaskExecutorTest::assertRemoteCommandNameEquals( + StringData cmdName, const RemoteCommandRequest& request) { auto&& cmdObj = request.cmdObj; ASSERT_FALSE(cmdObj.isEmpty()); if (cmdName != cmdObj.firstElementFieldName()) { @@ -53,6 +53,7 @@ void TaskExecutorTest::assertRemoteCommandNameEquals(StringData cmdName, << cmdObj.firstElementFieldName() << "\" instead: " << request.toString(); FAIL(msg); } + return request; } TaskExecutorTest::~TaskExecutorTest() = default; diff --git a/src/mongo/executor/task_executor_test_fixture.h b/src/mongo/executor/task_executor_test_fixture.h index 0ba3a6d6604..d89b6e0062d 100644 --- a/src/mongo/executor/task_executor_test_fixture.h +++ b/src/mongo/executor/task_executor_test_fixture.h @@ -53,10 +53,11 @@ public: static Status getDetectableErrorStatus(); /** - * Validates command name in remote command request. + * Validates command name in remote command request. Returns the remote command request from + * the network interface for further validation if the command name matches. */ - static void assertRemoteCommandNameEquals(StringData cmdName, - const RemoteCommandRequest& request); + static RemoteCommandRequest assertRemoteCommandNameEquals(StringData cmdName, + const RemoteCommandRequest& request); protected: virtual ~TaskExecutorTest(); |