summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-11-17 12:05:26 -0500
committerBenety Goh <benety@mongodb.com>2016-12-06 11:41:52 -0500
commite30e39ce1a4a55c46db13ad85f6c1000297ea6ff (patch)
tree4125e5eee3f765fbd860338e6898f2345dd5fa32
parent726cafd713c7333640f8458ec9808ed4f678e3a7 (diff)
downloadmongo-e30e39ce1a4a55c46db13ad85f6c1000297ea6ff.tar.gz
SERVER-27052 added asynchronous operation support to DataReplicator
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/data_replicator.cpp1823
-rw-r--r--src/mongo/db/repl/data_replicator.h541
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp5
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h8
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp3813
-rw-r--r--src/mongo/db/repl/initial_sync_state.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp92
-rw-r--r--src/mongo/executor/task_executor_test_fixture.cpp5
-rw-r--r--src/mongo/executor/task_executor_test_fixture.h7
10 files changed, 4422 insertions, 1879 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d197a03e068..b11f2900c45 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1047,6 +1047,7 @@ env.Library(
'databases_cloner',
'multiapplier',
'oplog_buffer_blocking_queue',
+ 'oplog_entry',
'oplog_fetcher',
'optime',
'rollback_checker',
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 79cd4f1d4ac..6e22361a270 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -33,10 +33,12 @@
#include "data_replicator.h"
#include <algorithm>
+#include <utility>
#include "mongo/base/counter.h"
#include "mongo/base/status.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/fetcher.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/commands/server_status_metric.h"
@@ -49,7 +51,6 @@
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/rollback_checker.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/db/server_parameters.h"
@@ -103,9 +104,21 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncConnectAttempts, int, 10);
// The number of attempts to call find on the remote oplog.
MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncOplogFindAttempts, int, 3);
+// The number of initial sync attempts that have failed since server startup. Each instance of
+// DataReplicator may run multiple attempts to fulfill an initial sync request that is triggered
+// when DataReplicator::startup() is called.
Counter64 initialSyncFailedAttempts;
+
+// The number of initial sync requests that have been requested and failed. Each instance of
+// DataReplicator (upon successful startup()) corresponds to a single initial sync request.
+// This value does not include the number of times where a DataReplicator is created successfully
+// but failed in startup().
Counter64 initialSyncFailures;
+
+// The number of initial sync requests that have been requested and completed successfully. Each
+// instance of DataReplicator corresponds to a single initial sync request.
Counter64 initialSyncCompletes;
+
ServerStatusMetricField<Counter64> displaySSInitialSyncFailedAttempts(
"repl.initialSync.failedAttempts", &initialSyncFailedAttempts);
ServerStatusMetricField<Counter64> displaySSInitialSyncFailures("repl.initialSync.failures",
@@ -117,20 +130,6 @@ ServiceContext::UniqueOperationContext makeOpCtx() {
return cc().makeOperationContext();
}
-StatusWith<TaskExecutor::CallbackHandle> scheduleWork(
- TaskExecutor* exec,
- stdx::function<void(OperationContext* txn, const CallbackArgs& cbData)> func) {
-
- // Wrap 'func' with a lambda that checks for cancallation and creates an OperationContext*.
- return exec->scheduleWork([func](const CallbackArgs& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- auto txn = makeOpCtx();
- func(txn.get(), cbData);
- });
-}
-
StatusWith<Timestamp> parseTimestampStatus(const QueryResponseStatus& fetchResult) {
if (!fetchResult.isOK()) {
return fetchResult.getStatus();
@@ -189,7 +188,12 @@ StatusWith<BSONObj> getLatestOplogEntry(executor::TaskExecutor* exec,
}
StatusWith<OpTimeWithHash> parseOpTimeWithHash(const BSONObj& oplogEntry) {
- auto oplogEntryHash = oplogEntry["h"].Long();
+ long long oplogEntryHash = 0LL;
+ auto status = bsonExtractIntegerField(oplogEntry, "h", &oplogEntryHash);
+ if (!status.isOK()) {
+ return status;
+ }
+
const auto lastOpTime = OpTime::parseFromOplogEntry(oplogEntry);
if (!lastOpTime.isOK()) {
return lastOpTime.getStatus();
@@ -206,96 +210,140 @@ StatusWith<OpTimeWithHash> parseOpTimeWithHash(const QueryResponseStatus& fetchR
const auto hasDoc = docs.begin() != docs.end();
return hasDoc
? parseOpTimeWithHash(docs.front())
- : StatusWith<OpTimeWithHash>{ErrorCodes::NoMatchingDocument, "No document in batch."};
-}
-
-Timestamp findCommonPoint(HostAndPort host, Timestamp start) {
- // TODO: walk back in the oplog looking for a known/shared optime.
- return Timestamp();
-}
-
-template <typename T>
-void swapAndJoin_inlock(UniqueLock* lock, T& uniquePtrToReset, const char* msg) {
- if (!uniquePtrToReset) {
- return;
- }
- T tempPtr = std::move(uniquePtrToReset);
- lock->unlock();
- LOG(1) << msg << tempPtr->toString();
- tempPtr->join();
- lock->lock();
+ : StatusWith<OpTimeWithHash>{ErrorCodes::NoMatchingDocument, "no oplog entry found"};
}
} // namespace
-std::string toString(DataReplicatorState s) {
- switch (s) {
- case DataReplicatorState::InitialSync:
- return "InitialSync";
- case DataReplicatorState::Uninitialized:
- return "Uninitialized";
- }
- MONGO_UNREACHABLE;
-}
-
// Data Replicator
DataReplicator::DataReplicator(
DataReplicatorOptions opts,
std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState,
- StorageInterface* storage)
+ StorageInterface* storage,
+ const OnCompletionFn& onCompletion)
: _fetchCount(0),
_opts(opts),
_dataReplicatorExternalState(std::move(dataReplicatorExternalState)),
_exec(_dataReplicatorExternalState->getTaskExecutor()),
- _dataReplicatorState(DataReplicatorState::Uninitialized),
- _storage(storage) {
+ _storage(storage),
+ _onCompletion(onCompletion) {
+ uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec);
uassert(ErrorCodes::BadValue, "invalid storage interface", _storage);
uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime);
uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime);
uassert(ErrorCodes::BadValue, "invalid getSlaveDelay function", _opts.getSlaveDelay);
uassert(ErrorCodes::BadValue, "invalid sync source selector", _opts.syncSourceSelector);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion);
}
DataReplicator::~DataReplicator() {
DESTRUCTOR_GUARD({
- UniqueLock lk(_mutex);
- _cancelAllHandles_inlock();
- _waitOnAndResetAll_inlock(&lk);
+ shutdown();
+ join();
});
}
+bool DataReplicator::isActive() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _isActive_inlock();
+}
+
+bool DataReplicator::_isActive_inlock() const {
+ return State::kRunning == _state || State::kShuttingDown == _state;
+}
+
+Status DataReplicator::startup(OperationContext* txn,
+ std::uint32_t initialSyncMaxAttempts) noexcept {
+ invariant(txn);
+ invariant(initialSyncMaxAttempts >= 1U);
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ switch (_state) {
+ case State::kPreStart:
+ _state = State::kRunning;
+ break;
+ case State::kRunning:
+ return Status(ErrorCodes::IllegalOperation, "data replicator already started");
+ case State::kShuttingDown:
+ return Status(ErrorCodes::ShutdownInProgress, "data replicator shutting down");
+ case State::kComplete:
+ return Status(ErrorCodes::ShutdownInProgress, "data replicator completed");
+ }
+
+ _setUp_inlock(txn, initialSyncMaxAttempts);
+
+ // Start first initial sync attempt.
+ std::uint32_t initialSyncAttempt = 0;
+ auto status = _scheduleWorkAndSaveHandle_inlock(
+ stdx::bind(&DataReplicator::_startInitialSyncAttemptCallback,
+ this,
+ stdx::placeholders::_1,
+ initialSyncAttempt,
+ initialSyncMaxAttempts),
+ &_startInitialSyncAttemptHandle,
+ str::stream() << "_startInitialSyncAttemptCallback-" << initialSyncAttempt);
+
+ if (!status.isOK()) {
+ _state = State::kComplete;
+ return status;
+ }
+
+ return Status::OK();
+}
+
Status DataReplicator::shutdown() {
- auto status = scheduleShutdown();
- if (status.isOK()) {
- log() << "Waiting for shutdown of DataReplicator.";
- waitForShutdown();
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ switch (_state) {
+ case State::kPreStart:
+ // Transition directly from PreStart to Complete if not started yet.
+ _state = State::kComplete;
+ return Status::OK();
+ case State::kRunning:
+ _state = State::kShuttingDown;
+ break;
+ case State::kShuttingDown:
+ case State::kComplete:
+ // Nothing to do if we are already in ShuttingDown or Complete state.
+ return Status::OK();
}
- return status;
+
+ _cancelRemainingWork_inlock();
+
+ return Status::OK();
}
-DataReplicatorState DataReplicator::getState() const {
- LockGuard lk(_mutex);
- return _dataReplicatorState;
+void DataReplicator::_cancelRemainingWork_inlock() {
+ _cancelHandle_inlock(_startInitialSyncAttemptHandle);
+ _cancelHandle_inlock(_chooseSyncSourceHandle);
+ _cancelHandle_inlock(_getBaseRollbackIdHandle);
+ _cancelHandle_inlock(_getLastRollbackIdHandle);
+ _cancelHandle_inlock(_getNextApplierBatchHandle);
+
+ _shutdownComponent_inlock(_oplogFetcher);
+ if (_initialSyncState) {
+ _shutdownComponent_inlock(_initialSyncState->dbsCloner);
+ }
+ _shutdownComponent_inlock(_applier);
+ _shutdownComponent_inlock(_lastOplogEntryFetcher);
}
-HostAndPort DataReplicator::getSyncSource() const {
- LockGuard lk(_mutex);
- return _syncSource;
+void DataReplicator::join() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); });
}
-OpTimeWithHash DataReplicator::getLastFetched() const {
- LockGuard lk(_mutex);
- return _lastFetched;
+DataReplicator::State DataReplicator::getState_forTest() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _state;
}
-OpTimeWithHash DataReplicator::getLastApplied() const {
- LockGuard lk(_mutex);
- return _lastApplied;
+bool DataReplicator::_isShuttingDown() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _isShuttingDown_inlock();
}
-size_t DataReplicator::getOplogBufferCount() const {
- // Oplog buffer is internally synchronized.
- return _oplogBuffer->getCount();
+bool DataReplicator::_isShuttingDown_inlock() const {
+ return State::kShuttingDown == _state;
}
std::string DataReplicator::getDiagnosticString() const {
@@ -303,11 +351,10 @@ std::string DataReplicator::getDiagnosticString() const {
str::stream out;
out << "DataReplicator -"
<< " opts: " << _opts.toString() << " oplogFetcher: " << _oplogFetcher->toString()
- << " opsBuffered: " << _oplogBuffer->getSize()
- << " state: " << toString(_dataReplicatorState);
+ << " opsBuffered: " << _oplogBuffer->getSize() << " active: " << _isActive_inlock()
+ << " shutting down: " << _isShuttingDown_inlock();
if (_initialSyncState) {
- out << " opsAppied: " << _initialSyncState->appliedOps
- << " status: " << _initialSyncState->status.toString();
+ out << " opsAppied: " << _initialSyncState->appliedOps;
}
return out;
@@ -345,445 +392,789 @@ BSONObj DataReplicator::_getInitialSyncProgress_inlock() const {
return bob.obj();
}
-void DataReplicator::_resetState_inlock(OperationContext* txn, OpTimeWithHash lastAppliedOpTime) {
- invariant(!_anyActiveHandles_inlock());
- _lastApplied = _lastFetched = lastAppliedOpTime;
- if (_oplogBuffer) {
- _oplogBuffer->clear(txn);
- }
-}
-
void DataReplicator::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
LockGuard lk(_mutex);
_scheduleDbWorkFn = work;
}
-Status DataReplicator::_runInitialSyncAttempt_inlock(OperationContext* txn,
- UniqueLock& lk,
- HostAndPort syncSource) {
- RollbackChecker rollbackChecker(_exec, syncSource);
- invariant(lk.owns_lock());
- Status statusFromWrites(ErrorCodes::NotYetInitialized, "About to run Initial Sync Attempt.");
+void DataReplicator::_setUp_inlock(OperationContext* txn, std::uint32_t initialSyncMaxAttempts) {
+ // This will call through to the storageInterfaceImpl to ReplicationCoordinatorImpl.
+ // 'txn' is passed through from startup().
+ _storage->setInitialSyncFlag(txn);
+
+ LOG(1) << "Creating oplogBuffer.";
+ _oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer(txn);
+ _oplogBuffer->startup(txn);
+
+ _stats.initialSyncStart = _exec->now();
+ _stats.maxFailedInitialSyncAttempts = initialSyncMaxAttempts;
+ _stats.failedInitialSyncAttempts = 0;
+}
+
+void DataReplicator::_tearDown_inlock(OperationContext* txn,
+ const StatusWith<OpTimeWithHash>& lastApplied) {
+ _stats.initialSyncEnd = _exec->now();
+
+ // This might not be necessary if we failed initial sync.
+ invariant(_oplogBuffer);
+ _oplogBuffer->shutdown(txn);
+
+ if (!lastApplied.isOK()) {
+ return;
+ }
+ _storage->clearInitialSyncFlag(txn);
+ _opts.setMyLastOptime(lastApplied.getValue().opTime);
+ log() << "initial sync done; took "
+ << duration_cast<Seconds>(_stats.initialSyncEnd - _stats.initialSyncStart) << ".";
+ initialSyncCompletes.increment();
+}
+
+void DataReplicator::_startInitialSyncAttemptCallback(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs,
+ std::uint32_t initialSyncAttempt,
+ std::uint32_t initialSyncMaxAttempts) {
+ auto status = _checkForShutdownAndConvertStatus_inlock(
+ callbackArgs,
+ str::stream() << "error while starting initial sync attempt " << (initialSyncAttempt + 1)
+ << " of "
+ << initialSyncMaxAttempts);
+ if (!status.isOK()) {
+ _finishInitialSyncAttempt(status);
+ return;
+ }
+
+ log() << "Starting initial sync (attempt " << (initialSyncAttempt + 1) << " of "
+ << initialSyncMaxAttempts << ")";
+
+ // This completion guard invokes _finishInitialSyncAttempt on destruction.
+ auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
+ auto finishInitialSyncAttemptFn = [this](const StatusWith<OpTimeWithHash>& lastApplied) {
+ _finishInitialSyncAttempt(lastApplied);
+ };
+ auto onCompletionGuard =
+ std::make_shared<OnCompletionGuard>(cancelRemainingWorkInLock, finishInitialSyncAttemptFn);
+
+ // Lock guard must be declared after completion guard because completion guard destructor
+ // has to run outside lock.
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ LOG(2) << "Resetting sync source so a new one can be chosen for this initial sync attempt.";
+ _syncSource = HostAndPort();
+
+ _lastApplied = {};
+ _lastFetched = {};
+ _oplogBuffer->clear(makeOpCtx().get());
+
+ // Get sync source.
+ std::uint32_t chooseSyncSourceAttempt = 0;
+ std::uint32_t chooseSyncSourceMaxAttempts =
+ static_cast<std::uint32_t>(numInitialSyncConnectAttempts.load());
+
+ // _scheduleWorkAndSaveHandle_inlock() is shutdown-aware.
+ status = _scheduleWorkAndSaveHandle_inlock(
+ stdx::bind(&DataReplicator::_chooseSyncSourceCallback,
+ this,
+ stdx::placeholders::_1,
+ chooseSyncSourceAttempt,
+ chooseSyncSourceMaxAttempts,
+ onCompletionGuard),
+ &_chooseSyncSourceHandle,
+ str::stream() << "_chooseSyncSourceCallback-" << chooseSyncSourceAttempt);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+}
+
+void DataReplicator::_chooseSyncSourceCallback(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs,
+ std::uint32_t chooseSyncSourceAttempt,
+ std::uint32_t chooseSyncSourceMaxAttempts,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ // Cancellation should be treated the same as other errors. In this case, the most likely cause
+ // of a failed _chooseSyncSourceCallback() task is a cancellation triggered by
+ // DataReplicator::shutdown() or the task executor shutting down.
+ auto status =
+ _checkForShutdownAndConvertStatus_inlock(callbackArgs, "error while choosing sync source");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+ if (MONGO_FAIL_POINT(failInitialSyncWithBadHost)) {
+ status = Status(ErrorCodes::InvalidSyncSource,
+ "no sync source avail(failInitialSyncWithBadHost failpoint is set).");
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+
+ auto syncSource = _chooseSyncSource_inlock();
+ if (!syncSource.isOK()) {
+ if (chooseSyncSourceAttempt + 1 >= chooseSyncSourceMaxAttempts) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(
+ lock,
+ Status(ErrorCodes::InitialSyncOplogSourceMissing,
+ "No valid sync source found in current replica set to do an initial sync."));
+ return;
+ }
+
+ auto when = _exec->now() + _opts.syncSourceRetryWait;
+ LOG(1) << "Error getting sync source: '" << syncSource.getStatus() << "', trying again in "
+ << _opts.syncSourceRetryWait << " at " << when.toString() << ". Attempt "
+ << (chooseSyncSourceAttempt + 1) << " of " << numInitialSyncConnectAttempts.load();
+ auto status = _scheduleWorkAtAndSaveHandle_inlock(
+ when,
+ stdx::bind(&DataReplicator::_chooseSyncSourceCallback,
+ this,
+ stdx::placeholders::_1,
+ chooseSyncSourceAttempt + 1,
+ chooseSyncSourceMaxAttempts,
+ onCompletionGuard),
+ &_chooseSyncSourceHandle,
+ str::stream() << "_chooseSyncSourceCallback-" << (chooseSyncSourceAttempt + 1));
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+ return;
+ }
+
+ // There is no need to schedule separate task to create oplog collection since we are already in
+ // a callback and we are certain there's no existing operation context (required for creating
+ // collections and dropping user databases) attached to the current thread.
+ status = _recreateOplogAndDropReplicatedDatabases();
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+
+ // Schedule rollback ID checker.
+ _syncSource = syncSource.getValue();
+ _rollbackChecker = stdx::make_unique<RollbackChecker>(_exec, _syncSource);
+ auto scheduleResult =
+ _rollbackChecker->reset(stdx::bind(&DataReplicator::_rollbackCheckerResetCallback,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard));
+ status = scheduleResult.getStatus();
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+ _getBaseRollbackIdHandle = scheduleResult.getValue();
+}
+
+Status DataReplicator::_recreateOplogAndDropReplicatedDatabases() {
// drop/create oplog; drop user databases.
LOG(1) << "About to drop+create the oplog, if it exists, ns:" << _opts.localOplogNS
<< ", and drop all user databases (so that we can clone them).";
- const auto schedStatus = scheduleWork(
- _exec, [&statusFromWrites, this](OperationContext* txn, const CallbackArgs& cd) {
- /**
- * This functions does the following:
- * 1.) Drop oplog
- * 2.) Drop user databases (replicated dbs)
- * 3.) Create oplog
- */
- if (!cd.status.isOK()) {
- error() << "Error while being called to drop/create oplog and drop users "
- << "databases, oplogNS: " << _opts.localOplogNS
- // REDACT cd??
- << " with status:" << cd.status.toString();
- statusFromWrites = cd.status;
- return;
- }
- invariant(txn);
- // We are not replicating nor validating these writes.
- txn->setReplicatedWrites(false);
+ auto txn = makeOpCtx();
- // 1.) Drop the oplog.
- LOG(2) << "Dropping the existing oplog: " << _opts.localOplogNS;
- statusFromWrites = _storage->dropCollection(txn, _opts.localOplogNS);
+ // We are not replicating nor validating these writes.
+ UnreplicatedWritesBlock unreplicatedWritesBlock(txn.get());
+ // 1.) Drop the oplog.
+ LOG(2) << "Dropping the existing oplog: " << _opts.localOplogNS;
+ auto status = _storage->dropCollection(txn.get(), _opts.localOplogNS);
+ if (!status.isOK()) {
+ return status;
+ }
- // 2.) Drop user databases.
- if (statusFromWrites.isOK()) {
- LOG(2) << "Dropping user databases";
- statusFromWrites = _storage->dropReplicatedDatabases(txn);
- }
+ // 2.) Drop user databases.
+ LOG(2) << "Dropping user databases";
+ status = _storage->dropReplicatedDatabases(txn.get());
+ if (!status.isOK()) {
+ return status;
+ }
- // 3.) Create the oplog.
- if (statusFromWrites.isOK()) {
- LOG(2) << "Creating the oplog: " << _opts.localOplogNS;
- statusFromWrites = _storage->createOplog(txn, _opts.localOplogNS);
- }
+ // 3.) Create the oplog.
+ LOG(2) << "Creating the oplog: " << _opts.localOplogNS;
+ return _storage->createOplog(txn.get(), _opts.localOplogNS);
+}
- });
+void DataReplicator::_rollbackCheckerResetCallback(
+ const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(),
+ "error while getting base rollback ID");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
- if (!schedStatus.isOK())
- return schedStatus.getStatus();
-
- lk.unlock();
- _exec->wait(schedStatus.getValue());
- if (!statusFromWrites.isOK()) {
- lk.lock();
- return statusFromWrites;
- }
-
- auto rollbackStatus = rollbackChecker.reset_sync();
- lk.lock();
- if (!rollbackStatus.isOK())
- return rollbackStatus;
-
- Event initialSyncFinishEvent;
- StatusWith<Event> eventStatus = _exec->makeEvent();
- if (!eventStatus.isOK()) {
- return eventStatus.getStatus();
- }
- initialSyncFinishEvent = eventStatus.getValue();
-
- if (_inShutdown) {
- // Signal shutdown event.
- _doNextActions_inlock();
- return Status(ErrorCodes::ShutdownInProgress,
- "initial sync terminated before creating cloner");
- }
-
- invariant(initialSyncFinishEvent.isValid());
- _initialSyncState.reset(new InitialSyncState(
- stdx::make_unique<DatabasesCloner>(
- _storage,
- _exec,
- _dataReplicatorExternalState->getDbWorkThreadPool(),
- syncSource,
- [](BSONObj dbInfo) {
- const std::string name = dbInfo["name"].str();
- return (name != "local");
- },
- stdx::bind(
- &DataReplicator::_onDataClonerFinish, this, stdx::placeholders::_1, syncSource)),
- initialSyncFinishEvent));
-
- const NamespaceString ns(_opts.remoteOplogNS);
- lk.unlock();
- // get the latest oplog entry, and parse out the optime + hash.
- const auto lastOplogEntry = getLatestOplogEntry(_exec, syncSource, ns);
- const auto lastOplogEntryOpTimeWithHashStatus = lastOplogEntry.isOK()
- ? parseOpTimeWithHash(lastOplogEntry.getValue())
- : StatusWith<OpTimeWithHash>{lastOplogEntry.getStatus()};
-
- lk.lock();
-
- if (!lastOplogEntryOpTimeWithHashStatus.isOK()) {
- _initialSyncState->status = lastOplogEntryOpTimeWithHashStatus.getStatus();
- return _initialSyncState->status;
- }
-
- _initialSyncState->oplogSeedDoc = lastOplogEntry.getValue().getOwned();
- const auto lastOpTimeWithHash = lastOplogEntryOpTimeWithHashStatus.getValue();
- _initialSyncState->beginTimestamp = lastOpTimeWithHash.opTime.getTimestamp();
+ status = _scheduleLastOplogEntryFetcher_inlock(
+ stdx::bind(&DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard));
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+}
+
+void DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp(
+ const StatusWith<Fetcher::QueryResponse>& result,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(
+ result.getStatus(), "error while getting last oplog entry for begin timestamp");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
- if (_oplogFetcher) {
- if (_oplogFetcher->isActive()) {
- LOG(3) << "Fetcher is active, stopping it.";
- _oplogFetcher->shutdown();
+ const auto opTimeWithHashResult = parseOpTimeWithHash(result);
+ status = opTimeWithHashResult.getStatus();
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+
+ // This is where the flow of control starts to split into two parallel tracks:
+ // - oplog fetcher
+ // - data cloning and applier
+ auto listDatabasesFilter = [](BSONObj dbInfo) {
+ std::string name;
+ auto status = mongo::bsonExtractStringField(dbInfo, "name", &name);
+ if (!status.isOK()) {
+ error() << "listDatabases filter failed to parse database name from " << redact(dbInfo)
+ << ": " << redact(status);
+ return false;
}
+ return (name != "local");
+ };
+ _initialSyncState = stdx::make_unique<InitialSyncState>(
+ stdx::make_unique<DatabasesCloner>(_storage,
+ _exec,
+ _dataReplicatorExternalState->getDbWorkThreadPool(),
+ _syncSource,
+ listDatabasesFilter,
+ stdx::bind(&DataReplicator::_databasesClonerCallback,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard)));
+
+ const auto& lastOpTimeWithHash = opTimeWithHashResult.getValue();
+ _initialSyncState->beginTimestamp = lastOpTimeWithHash.opTime.getTimestamp();
+
+ invariant(!result.getValue().documents.empty());
+ LOG(2) << "Setting begin timestamp to " << _initialSyncState->beginTimestamp
+ << " using last oplog entry: " << redact(result.getValue().documents.front())
+ << ", ns: " << _opts.localOplogNS;
+
+
+ const auto configResult = _dataReplicatorExternalState->getCurrentConfig();
+ status = configResult.getStatus();
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ _initialSyncState.reset();
+ return;
}
- _oplogFetcher.reset();
-
- const auto config = uassertStatusOK(_dataReplicatorExternalState->getCurrentConfig());
- _oplogFetcher = stdx::make_unique<OplogFetcher>(_exec,
- lastOpTimeWithHash,
- syncSource,
- _opts.remoteOplogNS,
- config,
- _opts.oplogFetcherMaxFetcherRestarts,
- _dataReplicatorExternalState.get(),
- stdx::bind(&DataReplicator::_enqueueDocuments,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3),
- stdx::bind(&DataReplicator::_onOplogFetchFinish,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2));
+
+ const auto& config = configResult.getValue();
+ _oplogFetcher =
+ stdx::make_unique<OplogFetcher>(_exec,
+ lastOpTimeWithHash,
+ _syncSource,
+ _opts.remoteOplogNS,
+ config,
+ _opts.oplogFetcherMaxFetcherRestarts,
+ _dataReplicatorExternalState.get(),
+ stdx::bind(&DataReplicator::_enqueueDocuments,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3),
+ stdx::bind(&DataReplicator::_oplogFetcherCallback,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ onCompletionGuard));
+
LOG(2) << "Starting OplogFetcher: " << _oplogFetcher->toString();
- auto oplogFetcherStartupStatus = _oplogFetcher->startup();
- if (!oplogFetcherStartupStatus.isOK()) {
- return oplogFetcherStartupStatus;
- }
- DatabasesCloner* cloner = _initialSyncState->dbsCloner.get();
- if (_scheduleDbWorkFn) {
- cloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn);
+ // _startupComponent_inlock is shutdown-aware.
+ status = _startupComponent_inlock(_oplogFetcher);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ _initialSyncState->dbsCloner.reset();
+ return;
}
- lk.unlock();
if (MONGO_FAIL_POINT(initialSyncHangBeforeCopyingDatabases)) {
+ lock.unlock();
+ // This could have been done with a scheduleWorkAt but this is used only by JS tests where
+ // we run with multiple threads so it's fine to spin on this thread.
// This log output is used in js tests so please leave it.
log() << "initial sync - initialSyncHangBeforeCopyingDatabases fail point "
"enabled. Blocking until fail point is disabled.";
- while (MONGO_FAIL_POINT(initialSyncHangBeforeCopyingDatabases)) {
- lk.lock();
- if (!_initialSyncState->status.isOK()) {
- lk.unlock();
- break;
- }
- lk.unlock();
+ while (MONGO_FAIL_POINT(initialSyncHangBeforeCopyingDatabases) && !_isShuttingDown()) {
mongo::sleepsecs(1);
}
+ lock.lock();
}
- auto clonerStartupStatus = cloner->startup(); // When the cloner is done applier starts.
- if (!clonerStartupStatus.isOK()) {
- return clonerStartupStatus;
+ if (_scheduleDbWorkFn) {
+ // '_scheduleDbWorkFn' is passed through (DatabasesCloner->DatabaseCloner->CollectionCloner)
+ // to the CollectionCloner so that CollectionCloner's default TaskRunner can be disabled to
+ // facilitate testing.
+ _initialSyncState->dbsCloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn);
}
- _exec->waitForEvent(initialSyncFinishEvent);
+ LOG(2) << "Starting DatabasesCloner: " << _initialSyncState->dbsCloner->toString();
- log() << "Initial sync attempt finishing up.";
- lk.lock();
- if (!_initialSyncState->status.isOK()) {
- return _initialSyncState->status;
+ // _startupComponent_inlock() is shutdown-aware. Additionally, if the component fails to
+ // startup, _startupComponent_inlock() resets the unique_ptr to the component (in this case,
+ // DatabasesCloner).
+ status = _startupComponent_inlock(_initialSyncState->dbsCloner);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
}
- lk.unlock();
+}
+
+void DataReplicator::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus,
+ const OpTimeWithHash& lastFetched,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ log() << "Finished fetching oplog during initial sync: " << redact(oplogFetcherFinishStatus)
+ << ". Last fetched optime and hash: " << lastFetched;
- // Check for roll back, and fail if so.
- auto hasHadRollbackResponse = rollbackChecker.hasHadRollback();
- lk.lock();
- if (!hasHadRollbackResponse.isOK()) {
- _initialSyncState->status = hasHadRollbackResponse.getStatus();
- } else if (hasHadRollbackResponse.getValue()) {
- _initialSyncState->status = {ErrorCodes::UnrecoverableRollbackError,
- "Rollback occurred during initial sync"};
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(
+ oplogFetcherFinishStatus, "error fetching oplog during initial sync");
+
+ // When the OplogFetcher completes early (instead of being canceled at shutdown), we log and let
+ // our reference to 'onCompletionGuard' go out of scope. Since we know the
+ // DatabasesCloner/MultiApplier will still have a reference to it, the actual function within
+ // the guard won't be fired yet.
+ // It is up to the DatabasesCloner and MultiApplier to determine if they can proceed without any
+ // additional data going into the oplog buffer.
+ // It is not common for the OplogFetcher to return with an OK status. The only time it returns
+ // an OK status is when the 'stopOplogFetcher' fail point is enabled, which causes the
+ // OplogFetcher to ignore the current sync source response and return early.
+ if (status.isOK()) {
+ log() << "Finished fetching oplog fetching early. Last fetched optime and hash: "
+ << lastFetched.toString();
+ _lastFetched = lastFetched;
+ return;
}
- if (!_initialSyncState->status.isOK()) {
- return _initialSyncState->status;
+ // During normal operation, this call to onCompletion->setResultAndCancelRemainingWork_inlock
+ // is a no-op because the other thread running the DatabasesCloner or MultiApplier will already
+ // have called it with the success/failed status.
+ // The OplogFetcher does not finish on its own because of the oplog tailing query it runs on the
+ // sync source. The most common OplogFetcher completion status is CallbackCanceled due to either
+ // a shutdown request or completion of the data cloning and oplog application phases.
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+}
+
+void DataReplicator::_databasesClonerCallback(
+ const Status& databaseClonerFinishStatus,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(databaseClonerFinishStatus,
+ "error cloning databases");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
}
- // If no oplog entries were applied, then we need to store the document that we fetched before
- // we began cloning.
- if (_initialSyncState->appliedOps == 0) {
- auto oplogSeedDoc = _initialSyncState->oplogSeedDoc;
- lk.unlock();
+ status = _scheduleLastOplogEntryFetcher_inlock(
+ stdx::bind(&DataReplicator::_lastOplogEntryFetcherCallbackForStopTimestamp,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard));
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+}
+
+void DataReplicator::_lastOplogEntryFetcherCallbackForStopTimestamp(
+ const StatusWith<Fetcher::QueryResponse>& result,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(
+ result.getStatus(), "error fetching last oplog entry for stop timestamp");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+
+ auto&& optimeWithHashStatus = parseOpTimeWithHash(result);
+ if (!optimeWithHashStatus.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(
+ lock, optimeWithHashStatus.getStatus());
+ return;
+ }
+ auto&& optimeWithHash = optimeWithHashStatus.getValue();
+ _initialSyncState->stopTimestamp = optimeWithHash.opTime.getTimestamp();
- LOG(1) << "inserting oplog seed document: " << _initialSyncState->oplogSeedDoc;
+ if (_initialSyncState->beginTimestamp == _initialSyncState->stopTimestamp) {
+ _lastApplied = optimeWithHash;
+ log() << "No need to apply operations. (currently at "
+ << _initialSyncState->stopTimestamp.toBSON() << ")";
+ } else {
+ invariant(_lastApplied.opTime.isNull());
+ _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard);
+ return;
+ }
+ }
- // Store the first oplog entry, after initial sync completes.
- const auto insertStatus =
- _storage->insertDocuments(txn, _opts.localOplogNS, {oplogSeedDoc});
- lk.lock();
+ // Oplog at sync source has not advanced since we started cloning databases, so we use the last
+ // oplog entry to seed the oplog before checking the rollback ID.
+ {
+ const auto& documents = result.getValue().documents;
+ invariant(!documents.empty());
+ const auto& oplogSeedDoc = documents.front();
+ LOG(1) << "inserting oplog seed document: " << oplogSeedDoc;
- if (!insertStatus.isOK()) {
- _initialSyncState->status = insertStatus;
- return _initialSyncState->status;
+ auto txn = makeOpCtx();
+ // StorageInterface::insertDocument() has to be called outside the lock because we may
+ // override its behavior in tests. See DataReplicatorReturnsCallbackCanceledAndDoesNot-
+ // ScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument in
+ // data_replicator_test.cpp
+ auto status = _storage->insertDocument(txn.get(), _opts.localOplogNS, oplogSeedDoc);
+ if (!status.isOK()) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
}
}
- return Status::OK(); // success
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ // This sets the error in 'onCompletionGuard' and shuts down the OplogFetcher on error.
+ _scheduleRollbackCheckerCheckForRollback_inlock(lock, onCompletionGuard);
}
-StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn,
- std::size_t maxAttempts) {
- const Status shutdownStatus{ErrorCodes::ShutdownInProgress,
- "Shutting down while in doInitialSync."};
- if (!txn) {
- std::string msg = "Initial Sync attempted but no OperationContext*, so aborting.";
- error() << msg;
- return Status{ErrorCodes::InitialSyncFailure, msg};
- }
- UniqueLock lk(_mutex);
- if (_inShutdown || (_initialSyncState && !_initialSyncState->status.isOK())) {
- const auto retStatus = (_initialSyncState && !_initialSyncState->status.isOK())
- ? _initialSyncState->status
- : shutdownStatus;
- return retStatus;
+void DataReplicator::_getNextApplierBatchCallback(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto status =
+ _checkForShutdownAndConvertStatus_inlock(callbackArgs, "error getting next applier batch");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
}
- _stats.initialSyncStart = _exec->now();
- if (_dataReplicatorState == DataReplicatorState::InitialSync) {
- return {ErrorCodes::InitialSyncActive,
- (str::stream() << "Initial sync in progress; try resync to start anew.")};
+
+ auto batchResult = _getNextApplierBatch_inlock();
+ if (!batchResult.isOK()) {
+ warning() << "Failure creating next apply batch: " << redact(batchResult.getStatus());
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, batchResult.getStatus());
+ return;
}
- LOG(1) << "Creating oplogBuffer.";
- _oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer(txn);
- _oplogBuffer->startup(txn);
- ON_BLOCK_EXIT([this, txn, &lk]() {
- if (!lk.owns_lock()) {
- lk.lock();
+ // Schedule MultiApplier if we have operations to apply.
+ const auto& ops = batchResult.getValue();
+ if (!ops.empty()) {
+ _fetchCount.store(0);
+ // "_syncSource" has to be copied to stdx::bind result.
+ HostAndPort source = _syncSource;
+ auto applyOperationsForEachReplicationWorkerThreadFn =
+ stdx::bind(&DataReplicatorExternalState::_multiInitialSyncApply,
+ _dataReplicatorExternalState.get(),
+ stdx::placeholders::_1,
+ source,
+ &_fetchCount);
+ auto applyBatchOfOperationsFn = stdx::bind(&DataReplicatorExternalState::_multiApply,
+ _dataReplicatorExternalState.get(),
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3);
+
+ const auto lastEntry = ops.back().raw;
+ const auto opTimeWithHashStatus = parseOpTimeWithHash(lastEntry);
+ status = opTimeWithHashStatus.getStatus();
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
}
- invariant(_oplogBuffer);
- _oplogBuffer->shutdown(txn);
- });
-
- lk.unlock();
- // This will call through to the storageInterfaceImpl to ReplicationCoordinatorImpl.
- _storage->setInitialSyncFlag(txn);
- lk.lock();
- _stats.maxFailedInitialSyncAttempts = maxAttempts;
- _stats.failedInitialSyncAttempts = 0;
- while (_stats.failedInitialSyncAttempts < _stats.maxFailedInitialSyncAttempts) {
- if (_inShutdown) {
- return shutdownStatus;
+ auto lastApplied = opTimeWithHashStatus.getValue();
+ auto numApplied = ops.size();
+ _applier =
+ stdx::make_unique<MultiApplier>(_exec,
+ ops,
+ applyOperationsForEachReplicationWorkerThreadFn,
+ applyBatchOfOperationsFn,
+ stdx::bind(&DataReplicator::_multiApplierCallback,
+ this,
+ stdx::placeholders::_1,
+ lastApplied,
+ numApplied,
+ onCompletionGuard));
+ status = _startupComponent_inlock(_applier);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
}
+ return;
+ }
- Status attemptErrorStatus(Status::OK());
-
- ON_BLOCK_EXIT([this, txn, &lk, &attemptErrorStatus]() {
- if (!lk.owns_lock()) {
- lk.lock();
- }
- if (_anyActiveHandles_inlock()) {
- _cancelAllHandles_inlock();
- _waitOnAndResetAll_inlock(&lk);
- if (!attemptErrorStatus.isOK()) {
- _initialSyncState.reset();
- }
- }
- });
+ // If the oplog fetcher is no longer running (completed successfully) and the oplog buffer is
+ // empty, we are not going to make any more progress with this initial sync. Report progress so
+ // far and return a RemoteResultsUnavailable error.
+ if (!_oplogFetcher->isActive()) {
+ std::string msg = str::stream()
+ << "The oplog fetcher is no longer running and we have applied all the oplog entries "
+ "in the oplog buffer. Aborting this initial sync attempt. Last applied: "
+ << _lastApplied.toString() << ". Last fetched: " << _lastFetched.toString()
+ << ". Number of operations applied: " << _initialSyncState->appliedOps;
+ log() << msg;
+ status = Status(ErrorCodes::RemoteResultsUnavailable, msg);
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
- _setState_inlock(DataReplicatorState::InitialSync);
- _applierPaused = true;
+ // If there are no operations at the moment to apply and the oplog fetcher is still waiting on
+ // the sync source, we'll check the oplog buffer again in
+ // '_opts.getApplierBatchCallbackRetryWait' ms.
+ auto when = _exec->now() + _opts.getApplierBatchCallbackRetryWait;
+ status = _scheduleWorkAtAndSaveHandle_inlock(
+ when,
+ stdx::bind(&DataReplicator::_getNextApplierBatchCallback,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard),
+ &_getNextApplierBatchHandle,
+ "_getNextApplierBatchCallback");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+}
- LOG(2) << "Resetting sync source so a new one can be chosen for this initial sync attempt.";
- _syncSource = HostAndPort();
+void DataReplicator::_multiApplierCallback(const Status& multiApplierStatus,
+ OpTimeWithHash lastApplied,
+ std::uint32_t numApplied,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto status =
+ _checkForShutdownAndConvertStatus_inlock(multiApplierStatus, "error applying batch");
+ if (!status.isOK()) {
+ error() << "Failed to apply batch due to '" << redact(status) << "'";
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
- _resetState_inlock(txn, OpTimeWithHash());
+ _initialSyncState->appliedOps += numApplied;
+ _lastApplied = lastApplied;
+ _opts.setMyLastOptime(_lastApplied.opTime);
- // For testing, we may want to fail if we receive a getmore.
- if (MONGO_FAIL_POINT(failInitialSyncWithBadHost)) {
- attemptErrorStatus =
- Status(ErrorCodes::InvalidSyncSource,
- "no sync source avail(failInitialSyncWithBadHost failpoint is set).");
+ auto fetchCount = _fetchCount.load();
+ if (fetchCount > 0) {
+ _initialSyncState->fetchedMissingDocs += fetchCount;
+ _fetchCount.store(0);
+ status = _scheduleLastOplogEntryFetcher_inlock(
+ stdx::bind(&DataReplicator::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard));
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
}
+ return;
+ }
- if (attemptErrorStatus.isOK()) {
- invariant(_syncSource.empty());
- for (int i = 0; i < numInitialSyncConnectAttempts; ++i) {
- auto syncSource = _chooseSyncSource_inlock();
- if (syncSource.isOK()) {
- _syncSource = syncSource.getValue();
- break;
- }
- attemptErrorStatus = syncSource.getStatus();
- LOG(1) << "Error getting sync source: '" << attemptErrorStatus.toString()
- << "', trying again in " << _opts.syncSourceRetryWait << ". Attempt "
- << i + 1 << " of " << numInitialSyncConnectAttempts.load();
- sleepmillis(durationCount<Milliseconds>(_opts.syncSourceRetryWait));
- }
+ _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard);
+}
- if (_syncSource.empty()) {
- attemptErrorStatus = Status(
- ErrorCodes::InitialSyncOplogSourceMissing,
- "No valid sync source found in current replica set to do an initial sync.");
- } else {
- attemptErrorStatus = _runInitialSyncAttempt_inlock(txn, lk, _syncSource);
- LOG(1) << "initial sync attempt returned with status: " << attemptErrorStatus;
- }
- }
+void DataReplicator::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments(
+ const StatusWith<Fetcher::QueryResponse>& result,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(
+ result.getStatus(), "error getting last oplog entry after fetching missing documents");
+ if (!status.isOK()) {
+ error() << "Failed to get new minValid from source " << _syncSource << " due to '"
+ << redact(status) << "'";
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
- auto runTime = _initialSyncState ? _initialSyncState->timer.millis() : 0;
- _stats.initialSyncAttemptInfos.emplace_back(
- DataReplicator::InitialSyncAttemptInfo{runTime, attemptErrorStatus, _syncSource});
-
- // If the status is ok now then initial sync is over. We must do this before we reset
- // _initialSyncState and lose the DatabasesCloner's stats.
- if (attemptErrorStatus.isOK()) {
- _stats.initialSyncEnd = _exec->now();
- log() << "Initial Sync Statistics: " << _getInitialSyncProgress_inlock();
- if (MONGO_FAIL_POINT(initialSyncHangBeforeFinish)) {
- lk.unlock();
- // This log output is used in js tests so please leave it.
- log() << "initial sync - initialSyncHangBeforeFinish fail point "
- "enabled. Blocking until fail point is disabled.";
- while (MONGO_FAIL_POINT(initialSyncHangBeforeFinish)) {
- lk.lock();
- if (!_initialSyncState->status.isOK()) {
- lk.unlock();
- break;
- }
- lk.unlock();
- mongo::sleepsecs(1);
- }
- lk.lock();
- }
- }
- if (_inShutdown) {
- const auto retStatus = (_initialSyncState && !_initialSyncState->status.isOK())
- ? _initialSyncState->status
- : shutdownStatus;
- error() << "Initial sync attempt terminated due to shutdown: " << shutdownStatus;
- return retStatus;
- }
+ auto&& optimeWithHashStatus = parseOpTimeWithHash(result);
+ if (!optimeWithHashStatus.isOK()) {
+ error() << "Failed to parse new minValid from source " << _syncSource << " due to '"
+ << redact(optimeWithHashStatus.getStatus()) << "'";
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock,
+ optimeWithHashStatus.getStatus());
+ return;
+ }
+ auto&& optimeWithHash = optimeWithHashStatus.getValue();
- // Cleanup
- _cancelAllHandles_inlock();
- _waitOnAndResetAll_inlock(&lk);
- invariant(!_anyActiveHandles_inlock());
+ const auto newOplogEnd = optimeWithHash.opTime.getTimestamp();
+ LOG(1) << "Pushing back minValid from " << _initialSyncState->stopTimestamp << " to "
+ << newOplogEnd;
+ _initialSyncState->stopTimestamp = newOplogEnd;
- if (attemptErrorStatus.isOK()) {
- break;
- }
+ // Get another batch to apply.
+ _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard);
+}
- ++_stats.failedInitialSyncAttempts;
- initialSyncFailedAttempts.increment();
-
- error() << "Initial sync attempt failed -- attempts left: "
- << (_stats.maxFailedInitialSyncAttempts - _stats.failedInitialSyncAttempts)
- << " cause: " << attemptErrorStatus;
-
- // Check if need to do more retries.
- if (_stats.failedInitialSyncAttempts >= _stats.maxFailedInitialSyncAttempts) {
- const std::string err =
- "The maximum number of retries"
- " have been exhausted for initial sync.";
- severe() << err;
-
- initialSyncFailures.increment();
- _setState_inlock(DataReplicatorState::Uninitialized);
- _stats.initialSyncEnd = _exec->now();
- log() << "Initial Sync Statistics: " << _getInitialSyncProgress_inlock();
- return attemptErrorStatus;
- }
+void DataReplicator::_rollbackCheckerCheckForRollbackCallback(
+ const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(),
+ "error while getting last rollback ID");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
- // Sleep for retry time
- lk.unlock();
- sleepmillis(durationCount<Milliseconds>(_opts.initialSyncRetryWait));
- lk.lock();
+ auto hasHadRollback = result.getValue();
+ if (hasHadRollback) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(
+ lock,
+ Status(ErrorCodes::UnrecoverableRollbackError,
+ str::stream() << "Rollback occurred on our sync source " << _syncSource
+ << " during initial sync"));
+ return;
}
- _applierPaused = false;
+ // Success!
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, _lastApplied);
+}
- _lastFetched = _lastApplied;
+void DataReplicator::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& lastApplied) {
+ // Since _finishInitialSyncAttempt can be called from any component's callback function or
+ // scheduled task, it is possible that we may not be in a TaskExecutor-managed thread when this
+ // function is invoked.
+ // For example, if CollectionCloner fails while inserting documents into the
+ // CollectionBulkLoader, we will get here via one of CollectionCloner's TaskRunner callbacks
+ // which has an active OperationContext bound to the current Client. This would lead to an
+ // invariant when we attempt to create a new OperationContext for _tearDown(txn).
+ // To avoid this, we schedule _finishCallback against the TaskExecutor rather than calling it
+ // here synchronously.
+
+ // Unless dismissed, a scope guard will schedule _finishCallback() upon exiting this function.
+ // Since it is a requirement that _finishCallback be called outside the lock (which is possible
+ // if the task scheduling fails and we have to invoke _finishCallback() synchronously), we
+ // declare the scope guard before the lock guard.
+ auto result = lastApplied;
+ auto finishCallbackGuard = MakeGuard([this, &result] {
+ auto scheduleResult =
+ _exec->scheduleWork(stdx::bind(&DataReplicator::_finishCallback, this, result));
+ if (!scheduleResult.isOK()) {
+ warning() << "Unable to schedule data replicator completion task due to "
+ << redact(scheduleResult.getStatus())
+ << ". Running callback on current thread.";
+ _finishCallback(result);
+ }
+ });
- _storage->clearInitialSyncFlag(txn);
- _opts.setMyLastOptime(_lastApplied.opTime);
- log() << "initial sync done; took "
- << duration_cast<Seconds>(_stats.initialSyncEnd - _stats.initialSyncStart) << ".";
- initialSyncCompletes.increment();
- return _lastApplied;
-}
+ log() << "Initial sync attempt finishing up.";
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ log() << "Initial Sync Attempt Statistics: " << redact(_getInitialSyncProgress_inlock());
-void DataReplicator::_onDataClonerFinish(const Status& status, HostAndPort syncSource) {
- log() << "data clone finished, status: " << redact(status);
+ auto runTime = _initialSyncState ? _initialSyncState->timer.millis() : 0;
+ _stats.initialSyncAttemptInfos.emplace_back(
+ DataReplicator::InitialSyncAttemptInfo{runTime, result.getStatus(), _syncSource});
- if (status.code() == ErrorCodes::CallbackCanceled) {
+ if (result.isOK()) {
+ // Scope guard will invoke _finishCallback().
return;
}
- LockGuard lk(_mutex);
- if (_inShutdown) {
- // Signal shutdown event.
- _doNextActions_inlock();
+ // This increments the number of failed attempts for the current initial sync request.
+ ++_stats.failedInitialSyncAttempts;
+
+ // This increments the number of failed attempts across all initial sync attempts since process
+ // startup.
+ initialSyncFailedAttempts.increment();
+
+ error() << "Initial sync attempt failed -- attempts left: "
+ << (_stats.maxFailedInitialSyncAttempts - _stats.failedInitialSyncAttempts)
+ << " cause: " << redact(result.getStatus());
+
+ // Check if need to do more retries.
+ if (_stats.failedInitialSyncAttempts >= _stats.maxFailedInitialSyncAttempts) {
+ const std::string err =
+ "The maximum number of retries have been exhausted for initial sync.";
+ severe() << err;
+
+ initialSyncFailures.increment();
+
+ // Scope guard will invoke _finishCallback().
return;
}
+ auto when = _exec->now() + _opts.initialSyncRetryWait;
+ auto status = _scheduleWorkAtAndSaveHandle_inlock(
+ when,
+ stdx::bind(&DataReplicator::_startInitialSyncAttemptCallback,
+ this,
+ stdx::placeholders::_1,
+ _stats.failedInitialSyncAttempts,
+ _stats.maxFailedInitialSyncAttempts),
+ &_startInitialSyncAttemptHandle,
+ str::stream() << "_startInitialSyncAttemptCallback-" << _stats.failedInitialSyncAttempts);
+
if (!status.isOK()) {
- // Initial sync failed during cloning of databases
- error() << "Failed to clone data due to '" << redact(status) << "'";
- invariant(_initialSyncState);
- _initialSyncState->status = status;
- _exec->signalEvent(_initialSyncState->finishEvent);
+ result = status;
+
+ // Scope guard will invoke _finishCallback().
return;
}
- _scheduleLastOplogEntryFetcher_inlock(
- stdx::bind(&DataReplicator::_onApplierReadyStart, this, stdx::placeholders::_1));
+ // Next initial sync attempt scheduled successfully and we do not need to call _finishCallback()
+ // until the next initial sync attempt finishes.
+ finishCallbackGuard.Dismiss();
+}
+
+void DataReplicator::_finishCallback(StatusWith<OpTimeWithHash> lastApplied) {
+ // After running callback function, clear '_onCompletion' to release any resources that might be
+ // held by this function object.
+ // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case
+ // there is any logic that's invoked at the function object's destruction that might call into
+ // this DataReplicator. 'onCompletion' must be destroyed outside the lock and this should happen
+ // before we transition the state to Complete.
+ decltype(_onCompletion) onCompletion;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto txn = makeOpCtx();
+ _tearDown_inlock(txn.get(), lastApplied);
+
+ invariant(_onCompletion);
+ std::swap(_onCompletion, onCompletion);
+ }
+
+ if (MONGO_FAIL_POINT(initialSyncHangBeforeFinish)) {
+ // This log output is used in js tests so please leave it.
+ log() << "initial sync - initialSyncHangBeforeFinish fail point "
+ "enabled. Blocking until fail point is disabled.";
+ while (MONGO_FAIL_POINT(initialSyncHangBeforeFinish) && !_isShuttingDown()) {
+ mongo::sleepsecs(1);
+ }
+ }
+
+ // Completion callback must be invoked outside mutex.
+ try {
+ onCompletion(lastApplied);
+ } catch (...) {
+ warning() << "data replicator finish callback threw exception: "
+ << redact(exceptionToStatus());
+ }
+
+ // Destroy the remaining reference to the completion callback before we transition the state to
+ // Complete so that callers can expect any resources bound to '_onCompletion' to be released
+ // before DataReplicator::join() returns.
+ onCompletion = {};
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state != State::kComplete);
+ _state = State::kComplete;
+ _stateCondition.notify_all();
}
-void DataReplicator::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback) {
+Status DataReplicator::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback) {
BSONObj query = BSON(
"find" << _opts.remoteOplogNS.coll() << "sort" << BSON("$natural" << -1) << "limit" << 1);
@@ -801,199 +1192,197 @@ void DataReplicator::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn c
RemoteCommandRetryScheduler::kAllRetriableErrors));
Status scheduleStatus = _lastOplogEntryFetcher->schedule();
if (!scheduleStatus.isOK()) {
- _initialSyncState->status = scheduleStatus;
- _exec->signalEvent(_initialSyncState->finishEvent);
+ _lastOplogEntryFetcher.reset();
}
+
+ return scheduleStatus;
}
-void DataReplicator::_onApplierReadyStart(const QueryResponseStatus& fetchResult) {
- if (ErrorCodes::CallbackCanceled == fetchResult.getStatus()) {
+void DataReplicator::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
+ const std::lock_guard<std::mutex>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ // We should check our current state because shutdown() could have been called before
+ // we re-acquired the lock.
+ if (_isShuttingDown_inlock()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(
+ lock,
+ Status(ErrorCodes::CallbackCanceled,
+ "failed to schedule applier to check for "
+ "rollback: data replicator is shutting down"));
return;
}
- // Data clone done, move onto apply.
- LockGuard lk(_mutex);
- if (_inShutdown) {
- // Signal shutdown event.
- _doNextActions_inlock();
+ // Basic sanity check on begin/stop timestamps.
+ if (_initialSyncState->beginTimestamp > _initialSyncState->stopTimestamp) {
+ std::string msg = str::stream()
+ << "Possible rollback on sync source " << _syncSource.toString() << ". Currently at "
+ << _initialSyncState->stopTimestamp.toBSON() << ". Started at "
+ << _initialSyncState->beginTimestamp.toBSON();
+ error() << msg;
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(
+ lock, Status(ErrorCodes::OplogOutOfOrder, msg));
return;
}
- auto&& optimeWithHashStatus = parseOpTimeWithHash(fetchResult);
- if (optimeWithHashStatus.isOK()) {
- auto&& optimeWithHash = optimeWithHashStatus.getValue();
- _initialSyncState->stopTimestamp = optimeWithHash.opTime.getTimestamp();
-
- // Check if applied to/past our stopTimestamp.
- if (_initialSyncState->beginTimestamp < _initialSyncState->stopTimestamp) {
- invariant(_applierPaused);
- log() << "Applying operations until " << _initialSyncState->stopTimestamp.toBSON()
- << " before initial sync can complete. (starting at "
- << _initialSyncState->beginTimestamp.toBSON() << ")";
- _applierPaused = false;
- } else {
- log() << "No need to apply operations. (currently at "
- << _initialSyncState->stopTimestamp.toBSON() << ")";
- if (_lastApplied.opTime.getTimestamp() < _initialSyncState->stopTimestamp) {
- _lastApplied = optimeWithHash;
- }
- }
- } else {
- _initialSyncState->status = optimeWithHashStatus.getStatus();
+ if (_lastApplied.opTime.isNull()) {
+ // Check if any ops occurred while cloning.
+ invariant(_initialSyncState->beginTimestamp < _initialSyncState->stopTimestamp);
+ log() << "Applying operations until " << _initialSyncState->stopTimestamp.toBSON()
+ << " before initial sync can complete. (starting at "
+ << _initialSyncState->beginTimestamp.toBSON() << ")";
+ // Fall through to scheduling _getNextApplierBatchCallback().
+ } else if (_lastApplied.opTime.getTimestamp() >= _initialSyncState->stopTimestamp) {
+ // Check for rollback if we have applied far enough to be consistent.
+ invariant(!_lastApplied.opTime.getTimestamp().isNull());
+ _scheduleRollbackCheckerCheckForRollback_inlock(lock, onCompletionGuard);
+ return;
}
- // Ensure that the DatabasesCloner has reached an inactive state because this callback is
- // scheduled by the DatabasesCloner callback. This will avoid a race in _doNextActions() where
- // we mistakenly think the cloner is still active.
- if (_initialSyncState->dbsCloner) {
- _initialSyncState->dbsCloner->join();
+ // Get another batch to apply.
+ // _scheduleWorkAndSaveHandle_inlock() is shutdown-aware.
+ auto status =
+ _scheduleWorkAndSaveHandle_inlock(stdx::bind(&DataReplicator::_getNextApplierBatchCallback,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard),
+ &_getNextApplierBatchHandle,
+ "_getNextApplierBatchCallback");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
}
-
- _doNextActions_inlock();
}
-bool DataReplicator::_anyActiveHandles_inlock() const {
- // If any component is active then retVal will be set to true.
- bool retVal = false;
+void DataReplicator::_scheduleRollbackCheckerCheckForRollback_inlock(
+ const std::lock_guard<std::mutex>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ // We should check our current state because shutdown() could have been called before
+ // we re-acquired the lock.
+ if (_isShuttingDown_inlock()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(
+ lock,
+ Status(ErrorCodes::CallbackCanceled,
+ "failed to schedule rollback checker to check "
+ "for rollback: data replicator is shutting "
+ "down"));
+ return;
+ }
- // For diagnostic reasons, do not return early once an active component is found, but instead
- // log each active component.
+ auto scheduleResult = _rollbackChecker->checkForRollback(
+ stdx::bind(&DataReplicator::_rollbackCheckerCheckForRollbackCallback,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard));
- if (_oplogFetcher && _oplogFetcher->isActive()) {
- LOG(0 /*1*/) << "_oplogFetcher is active (_anyActiveHandles_inlock): "
- << _oplogFetcher->toString();
- retVal = true;
+ auto status = scheduleResult.getStatus();
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
}
- if (_initialSyncState && _initialSyncState->dbsCloner &&
- _initialSyncState->dbsCloner->isActive()) {
- LOG(0 /*1*/) << "_initialSyncState::dbsCloner is active (_anyActiveHandles_inlock): "
- << _initialSyncState->dbsCloner->toString();
- retVal = true;
- }
+ _getLastRollbackIdHandle = scheduleResult.getValue();
+ return;
+}
- if (_applier && _applier->isActive()) {
- LOG(0 /*1*/) << "_applier is active (_anyActiveHandles_inlock): " << _applier->toString();
- retVal = true;
- }
+Status DataReplicator::_checkForShutdownAndConvertStatus_inlock(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message) {
+ return _checkForShutdownAndConvertStatus_inlock(callbackArgs.status, message);
+}
- if (_shuttingDownApplier && _shuttingDownApplier->isActive()) {
- LOG(0 /*1*/) << "_shuttingDownApplier is active (_anyActiveHandles_inlock): "
- << _shuttingDownApplier->toString();
- retVal = true;
- }
+Status DataReplicator::_checkForShutdownAndConvertStatus_inlock(const Status& status,
+ const std::string& message) {
- if (_lastOplogEntryFetcher && _lastOplogEntryFetcher->isActive()) {
- LOG(0 /*1*/) << "_lastOplogEntryFetcher is active (_anyActiveHandles_inlock): "
- << _lastOplogEntryFetcher->toString();
- retVal = true;
+ if (_isShuttingDown_inlock()) {
+ return Status(ErrorCodes::CallbackCanceled, message + ": data replicator is shutting down");
}
- if (!retVal) {
- LOG(0 /*2*/)
- << "DataReplicator::_anyActiveHandles_inlock returned false as nothing is active.";
+ if (!status.isOK()) {
+ return Status(status.code(), message + ": " + status.reason());
}
- return retVal;
-}
-void DataReplicator::_cancelAllHandles_inlock() {
- if (_oplogFetcher)
- _oplogFetcher->shutdown();
- if (_lastOplogEntryFetcher) {
- _lastOplogEntryFetcher->shutdown();
- }
- if (_applier)
- _applier->shutdown();
- // No need to call shutdown() on _shuttingdownApplier. This applier is assigned when the most
- // recent applier's finish callback has been invoked. Note that isActive() will still return
- // true if the callback is still in progress.
- if (_initialSyncState && _initialSyncState->dbsCloner &&
- _initialSyncState->dbsCloner->isActive()) {
- _initialSyncState->dbsCloner->shutdown();
- }
+ return Status::OK();
}
-void DataReplicator::_waitOnAndResetAll_inlock(UniqueLock* lk) {
- swapAndJoin_inlock(lk, _oplogFetcher, "Waiting on oplog fetcher: ");
- swapAndJoin_inlock(lk, _applier, "Waiting on applier: ");
- swapAndJoin_inlock(lk, _shuttingDownApplier, "Waiting on most recently completed applier: ");
- if (_initialSyncState) {
- swapAndJoin_inlock(lk, _initialSyncState->dbsCloner, "Waiting on databases cloner: ");
- }
- // A new _lastOplogEntryFetcher may be scheduled on completion of the DatabasesCloner and
- // MultiApplier so we wait on the fetcher after the DatabasesCloner and MultiApplier are
- // destroyed.
- swapAndJoin_inlock(lk, _lastOplogEntryFetcher, "Waiting on fetcher (last oplog entry): ");
+Status DataReplicator::_scheduleWorkAndSaveHandle_inlock(
+ const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name) {
+ invariant(handle);
+ if (_isShuttingDown_inlock()) {
+ return Status(ErrorCodes::CallbackCanceled,
+ str::stream() << "failed to schedule work " << name
+ << ": data replicator is shutting down");
+ }
+ auto result = _exec->scheduleWork(work);
+ if (!result.isOK()) {
+ return Status(result.getStatus().code(),
+ str::stream() << "failed to schedule work " << name << ": "
+ << result.getStatus().reason());
+ }
+ *handle = result.getValue();
+ return Status::OK();
}
-void DataReplicator::_doNextActions() {
- LockGuard lk(_mutex);
- _doNextActions_inlock();
+Status DataReplicator::_scheduleWorkAtAndSaveHandle_inlock(
+ Date_t when,
+ const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name) {
+ invariant(handle);
+ if (_isShuttingDown_inlock()) {
+ return Status(ErrorCodes::CallbackCanceled,
+ str::stream() << "failed to schedule work " << name << " at "
+ << when.toString()
+ << ": data replicator is shutting down");
+ }
+ auto result = _exec->scheduleWorkAt(when, work);
+ if (!result.isOK()) {
+ return Status(
+ result.getStatus().code(),
+ str::stream() << "failed to schedule work " << name << " at " << when.toString() << ": "
+ << result.getStatus().reason());
+ }
+ *handle = result.getValue();
+ return Status::OK();
}
-void DataReplicator::_doNextActions_inlock() {
- // Can be in one of 2 main states/modes (DataReplicatorState):
- // 1.) Initial Sync
- // 2.) Uninitialized
-
- // Check for shutdown flag, signal event
- if (_onShutdown.isValid()) {
- if (!_onShutdownSignaled) {
- _exec->signalEvent(_onShutdown);
- _setState_inlock(DataReplicatorState::Uninitialized);
- _onShutdownSignaled = true;
- }
+void DataReplicator::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle) {
+ if (!handle) {
return;
}
+ _exec->cancel(handle);
+}
- if (DataReplicatorState::Uninitialized == _dataReplicatorState) {
- return;
+template <typename Component>
+Status DataReplicator::_startupComponent_inlock(Component& component) {
+ if (_isShuttingDown_inlock()) {
+ return Status(ErrorCodes::CallbackCanceled,
+ "data replicator shutdown while trying to call startup() on component");
}
-
- invariant(_initialSyncState);
-
- if (!_initialSyncState->status.isOK()) {
- return;
- }
-
- if (_initialSyncState->dbsCloner) {
- if (_initialSyncState->dbsCloner->isActive() ||
- !_initialSyncState->dbsCloner->getStatus().isOK()) {
- return;
- }
+ auto status = component->startup();
+ if (!status.isOK()) {
+ component.reset();
}
+ return status;
+}
- // The DatabasesCloner has completed so make sure we apply far enough to be consistent.
- const auto lastAppliedTS = _lastApplied.opTime.getTimestamp();
- if (!lastAppliedTS.isNull() && lastAppliedTS >= _initialSyncState->stopTimestamp) {
- invariant(_initialSyncState->finishEvent.isValid());
- invariant(_initialSyncState->status.isOK());
- _setState_inlock(DataReplicatorState::Uninitialized);
- _exec->signalEvent(_initialSyncState->finishEvent);
+template <typename Component>
+void DataReplicator::_shutdownComponent_inlock(Component& component) {
+ if (!component) {
return;
}
-
- // Check if no active apply and ops to apply
- if (!_applier || !_applier->isActive()) {
- if (_oplogBuffer && _oplogBuffer->getSize() > 0) {
- const auto scheduleStatus = _scheduleApplyBatch_inlock();
- if (!scheduleStatus.isOK()) {
- if (scheduleStatus != ErrorCodes::ShutdownInProgress) {
- error() << "Error scheduling apply batch '" << scheduleStatus << "'.";
- _applier.reset();
- _scheduleDoNextActions();
- }
- }
- } else {
- LOG(3) << "Cannot apply a batch since we have nothing buffered.";
- }
- }
+ component->shutdown();
}
StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
+ // If the fail-point is active, delay the apply batch by returning an empty batch so that
+ // _getNextApplierBatchCallback() will reschedule itself at a later time.
+ // See DataReplicatorOptions::getApplierBatchCallbackRetryWait.
+ if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
+ return Operations();
+ }
+
const int slaveDelaySecs = durationCount<Seconds>(_opts.getSlaveDelay());
- size_t totalBytes = 0;
+ std::uint32_t totalBytes = 0;
Operations ops;
BSONObj op;
@@ -1008,6 +1397,15 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
while (_oplogBuffer->peek(txn.get(), &op)) {
auto entry = OplogEntry(std::move(op));
+ // Check for oplog version change. If it is absent, its value is one.
+ if (entry.getVersion() != OplogEntry::kOplogVersion) {
+ std::string message = str::stream()
+ << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
+ << entry.getVersion() << " in oplog entry: " << redact(entry.raw);
+ severe() << message;
+ return {ErrorCodes::BadValue, message};
+ }
+
// Check for ops that must be processed one at a time.
if (entry.isCommand() ||
// Index builds are achieved through the use of an insert op, not a command op.
@@ -1025,15 +1423,6 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
return std::move(ops);
}
- // Check for oplog version change. If it is absent, its value is one.
- if (entry.getVersion() != OplogEntry::kOplogVersion) {
- std::string message = str::stream()
- << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
- << entry.getVersion() << " in oplog entry: " << redact(entry.raw);
- severe() << message;
- return {ErrorCodes::BadValue, message};
- }
-
// Apply replication batch limits.
if (ops.size() >= _opts.replBatchLimitOperations) {
return std::move(ops);
@@ -1066,166 +1455,6 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
return std::move(ops);
}
-void DataReplicator::_onApplyBatchFinish(const Status& status,
- OpTimeWithHash lastApplied,
- std::size_t numApplied) {
- if (ErrorCodes::CallbackCanceled == status) {
- return;
- }
-
- UniqueLock lk(_mutex);
-
- if (_inShutdown) {
- // Signal shutdown event.
- _doNextActions_inlock();
- return;
- }
-
- // This might block in _shuttingDownApplier's destructor if it is still active here.
- _shuttingDownApplier = std::move(_applier);
-
- if (!status.isOK()) {
- invariant(DataReplicatorState::InitialSync == _dataReplicatorState);
- error() << "Failed to apply batch due to '" << redact(status) << "'";
- _initialSyncState->status = status;
- _exec->signalEvent(_initialSyncState->finishEvent);
- return;
- }
-
- auto fetchCount = _fetchCount.load();
- if (fetchCount > 0) {
- _initialSyncState->fetchedMissingDocs += fetchCount;
- _fetchCount.store(0);
- _onFetchMissingDocument_inlock(lastApplied, numApplied);
- // TODO (SERVER-25662): Remove this line.
- _applierPaused = true;
- return;
- }
- // TODO (SERVER-25662): Remove this line.
- _applierPaused = false;
-
-
- if (_initialSyncState) {
- _initialSyncState->appliedOps += numApplied;
- }
-
- _lastApplied = lastApplied;
- lk.unlock();
-
- _opts.setMyLastOptime(_lastApplied.opTime);
-
- _doNextActions();
-}
-
-void DataReplicator::_onFetchMissingDocument_inlock(OpTimeWithHash lastApplied,
- std::size_t numApplied) {
- _scheduleLastOplogEntryFetcher_inlock([this, lastApplied, numApplied](
- const QueryResponseStatus& fetchResult, Fetcher::NextAction*, BSONObjBuilder*) {
- auto&& lastOplogEntryOpTimeWithHashStatus = parseOpTimeWithHash(fetchResult);
-
- if (!lastOplogEntryOpTimeWithHashStatus.isOK()) {
- {
- LockGuard lk(_mutex);
- error() << "Failed to get new minValid from source " << _syncSource << " due to '"
- << redact(lastOplogEntryOpTimeWithHashStatus.getStatus()) << "'";
- _initialSyncState->status = lastOplogEntryOpTimeWithHashStatus.getStatus();
- }
- _exec->signalEvent(_initialSyncState->finishEvent);
- return;
- }
-
- const auto newOplogEnd =
- lastOplogEntryOpTimeWithHashStatus.getValue().opTime.getTimestamp();
- {
- LockGuard lk(_mutex);
- LOG(1) << "Pushing back minValid from " << _initialSyncState->stopTimestamp << " to "
- << newOplogEnd;
- _initialSyncState->stopTimestamp = newOplogEnd;
- }
- _onApplyBatchFinish(Status::OK(), lastApplied, numApplied);
- });
-}
-
-Status DataReplicator::_scheduleDoNextActions() {
- auto status = _exec->scheduleWork([this](const CallbackArgs& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- _doNextActions();
- });
- return status.getStatus();
-}
-
-Status DataReplicator::_scheduleApplyBatch_inlock() {
- if (_applierPaused) {
- return Status::OK();
- }
-
- if (_applier && _applier->isActive()) {
- return Status::OK();
- }
-
- // If the fail-point is active, delay the apply batch.
- if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
- auto status = _exec->scheduleWorkAt(_exec->now() + Milliseconds(10),
- [this](const CallbackArgs& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- _doNextActions();
- });
- return status.getStatus();
- }
-
- auto batchStatus = _getNextApplierBatch_inlock();
- if (!batchStatus.isOK()) {
- warning() << "Failure creating next apply batch: " << redact(batchStatus.getStatus());
- return batchStatus.getStatus();
- }
- const Operations& ops = batchStatus.getValue();
- if (ops.empty()) {
- return _scheduleDoNextActions();
- }
-
- invariant(_dataReplicatorState == DataReplicatorState::InitialSync);
- _fetchCount.store(0);
- // "_syncSource" has to be copied to stdx::bind result.
- HostAndPort source = _syncSource;
- auto applierFn = stdx::bind(&DataReplicatorExternalState::_multiInitialSyncApply,
- _dataReplicatorExternalState.get(),
- stdx::placeholders::_1,
- source,
- &_fetchCount);
- auto multiApplyFn = stdx::bind(&DataReplicatorExternalState::_multiApply,
- _dataReplicatorExternalState.get(),
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3);
-
- const auto lastEntry = ops.back().raw;
- const auto opTimeWithHashStatus = parseOpTimeWithHash(lastEntry);
- auto lastApplied = uassertStatusOK(opTimeWithHashStatus);
- auto numApplied = ops.size();
- auto lambda = stdx::bind(&DataReplicator::_onApplyBatchFinish,
- this,
- stdx::placeholders::_1,
- lastApplied,
- numApplied);
-
- invariant(!(_applier && _applier->isActive()));
- _applier = stdx::make_unique<MultiApplier>(_exec, ops, applierFn, multiApplyFn, lambda);
- return _applier->startup();
-}
-
-void DataReplicator::_setState(const DataReplicatorState& newState) {
- LockGuard lk(_mutex);
- _setState_inlock(newState);
-}
-
-void DataReplicator::_setState_inlock(const DataReplicatorState& newState) {
- _dataReplicatorState = newState;
-}
-
StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() {
auto syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime);
if (syncSource.empty()) {
@@ -1236,40 +1465,6 @@ StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() {
return syncSource;
}
-Status DataReplicator::scheduleShutdown() {
- auto eventStatus = _exec->makeEvent();
- if (!eventStatus.isOK()) {
- return eventStatus.getStatus();
- }
-
- {
- LockGuard lk(_mutex);
- invariant(!_onShutdown.isValid());
- _inShutdown = true;
- _onShutdown = eventStatus.getValue();
- if (DataReplicatorState::InitialSync == _dataReplicatorState && _initialSyncState &&
- _initialSyncState->status.isOK()) {
- _initialSyncState->status = {ErrorCodes::ShutdownInProgress,
- "Shutdown issued for the operation."};
- _exec->signalEvent(_initialSyncState->finishEvent);
- }
- _cancelAllHandles_inlock();
- }
-
- // Schedule _doNextActions in case nothing is active to trigger the _onShutdown event.
- return _scheduleDoNextActions();
-}
-
-void DataReplicator::waitForShutdown() {
- Event onShutdown;
- {
- LockGuard lk(_mutex);
- invariant(_onShutdown.isValid());
- onShutdown = _onShutdown;
- }
- _exec->waitForEvent(onShutdown);
-}
-
void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
const OplogFetcher::DocumentsInfo& info) {
@@ -1277,12 +1472,10 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
return;
}
- {
- LockGuard lk{_mutex};
- if (_inShutdown) {
- return;
- }
+ if (_isShuttingDown()) {
+ return;
}
+
invariant(_oplogBuffer);
// Wait for enough space.
@@ -1299,38 +1492,46 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
_lastFetched = info.lastDocument;
// TODO: updates metrics with "info".
-
- _doNextActions();
}
-void DataReplicator::_onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched) {
- log() << "Finished fetching oplog during initial sync: " << redact(status)
- << ". Last fetched optime and hash: " << lastFetched;
+DataReplicator::OnCompletionGuard::OnCompletionGuard(
+ const CancelRemainingWorkInLockFn& cancelRemainingWorkInLock,
+ const OnCompletionFn& onCompletion)
+ : _cancelRemainingWorkInLock(cancelRemainingWorkInLock), _onCompletion(onCompletion) {}
- if (status.code() == ErrorCodes::CallbackCanceled) {
- return;
- }
+DataReplicator::OnCompletionGuard::~OnCompletionGuard() {
+ MONGO_DESTRUCTOR_GUARD({
+ if (!_lastAppliedSet) {
+ severe() << "It is a programming error to destroy this initial sync attempt completion "
+ "guard without the caller providing a result for '_lastApplied'";
+ }
+ invariant(_lastAppliedSet);
+ // _onCompletion() must be called outside the DataReplicator's lock to avoid a deadlock.
+ _onCompletion(_lastApplied);
+ });
+}
- LockGuard lk(_mutex);
- if (_inShutdown) {
- // Signal shutdown event.
- _doNextActions_inlock();
- return;
- }
+void DataReplicator::OnCompletionGuard::setResultAndCancelRemainingWork_inlock(
+ const std::lock_guard<std::mutex>&, const StatusWith<OpTimeWithHash>& lastApplied) {
+ _setResultAndCancelRemainingWork_inlock(lastApplied);
+}
- if (!status.isOK()) {
- invariant(_dataReplicatorState == DataReplicatorState::InitialSync);
- // Do not change sync source, just log.
- error() << "Error fetching oplog during initial sync: " << redact(status);
- invariant(_initialSyncState);
- _initialSyncState->status = status;
- _exec->signalEvent(_initialSyncState->finishEvent);
+void DataReplicator::OnCompletionGuard::setResultAndCancelRemainingWork_inlock(
+ const std::unique_lock<std::mutex>& lock, const StatusWith<OpTimeWithHash>& lastApplied) {
+ invariant(lock.owns_lock());
+ _setResultAndCancelRemainingWork_inlock(lastApplied);
+}
+
+void DataReplicator::OnCompletionGuard::_setResultAndCancelRemainingWork_inlock(
+ const StatusWith<OpTimeWithHash>& lastApplied) {
+ if (_lastAppliedSet) {
return;
}
+ _lastApplied = lastApplied;
+ _lastAppliedSet = true;
- _lastFetched = lastFetched;
-
- _doNextActions_inlock();
+ // It is fine to call this multiple times.
+ _cancelRemainingWorkInLock();
}
std::string DataReplicator::Stats::toString() const {
@@ -1344,8 +1545,10 @@ BSONObj DataReplicator::Stats::toBSON() const {
}
void DataReplicator::Stats::append(BSONObjBuilder* builder) const {
- builder->appendNumber("failedInitialSyncAttempts", failedInitialSyncAttempts);
- builder->appendNumber("maxFailedInitialSyncAttempts", maxFailedInitialSyncAttempts);
+ builder->appendNumber("failedInitialSyncAttempts",
+ static_cast<long long>(failedInitialSyncAttempts));
+ builder->appendNumber("maxFailedInitialSyncAttempts",
+ static_cast<long long>(maxFailedInitialSyncAttempts));
if (initialSyncStart != Date_t()) {
builder->appendDate("initialSyncStart", initialSyncStart);
if (initialSyncEnd != Date_t()) {
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index 6316db8e3c1..ae2a541030a 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -29,6 +29,8 @@
#pragma once
+#include <cstdint>
+#include <iosfwd>
#include <memory>
#include "mongo/base/status.h"
@@ -41,21 +43,19 @@
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/rollback_checker.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/thread.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
-
-class QueryFetcher;
-
namespace repl {
namespace {
+
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
using Event = executor::TaskExecutor::EventHandle;
using Handle = executor::TaskExecutor::CallbackHandle;
@@ -81,23 +81,9 @@ MONGO_FP_FORWARD_DECLARE(rsSyncApplyStop);
struct InitialSyncState;
struct MemberState;
-class RollbackChecker;
class StorageInterface;
-/** State for decision tree */
-enum class DataReplicatorState {
- InitialSync,
- Uninitialized,
-};
-
-
-// Helper to convert enum to a string.
-std::string toString(DataReplicatorState s);
-
-// TBD -- ignore for now
-enum class DataReplicatorScope { ReplicateAll, ReplicateDB, ReplicateCollection };
-
struct DataReplicatorOptions {
/** Function to return optime of last operation applied on this node */
using GetMyLastOptimeFn = stdx::function<OpTime()>;
@@ -117,19 +103,20 @@ struct DataReplicatorOptions {
Seconds blacklistSyncSourcePenaltyForNetworkConnectionError{10};
Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10};
+ // DataReplicator waits this long before retrying getApplierBatchCallback() if there are
+ // currently no operations available to apply or if the 'rsSyncApplyStop' failpoint is active.
+ // This default value is based on the duration in BackgroundSync::waitForMore() and
+ // SyncTail::tryPopAndWaitForMore().
+ Milliseconds getApplierBatchCallbackRetryWait{1000};
+
// Batching settings.
- size_t replBatchLimitBytes = 512 * 1024 * 1024;
- size_t replBatchLimitOperations = 5000;
+ std::uint32_t replBatchLimitBytes = 512 * 1024 * 1024;
+ std::uint32_t replBatchLimitOperations = 5000;
// Replication settings
NamespaceString localOplogNS = NamespaceString("local.oplog.rs");
NamespaceString remoteOplogNS = NamespaceString("local.oplog.rs");
- // TBD -- ignore below for now
- DataReplicatorScope scope = DataReplicatorScope::ReplicateAll;
- std::string scopeNS;
- BSONObj filterCriteria;
-
GetMyLastOptimeFn getMyLastOptime;
SetMyLastOptimeFn setMyLastOptime;
GetSlaveDelayFn getSlaveDelay;
@@ -138,7 +125,7 @@ struct DataReplicatorOptions {
// The oplog fetcher will restart the oplog tailing query this many times on non-cancellation
// failures.
- std::size_t oplogFetcherMaxFetcherRestarts = 0;
+ std::uint32_t oplogFetcherMaxFetcherRestarts = 0;
std::string toString() const {
return str::stream() << "DataReplicatorOptions -- "
@@ -156,11 +143,65 @@ struct DataReplicatorOptions {
*
*
* Entry Points:
- * -- doInitialSync: Will drop all data and copy to a consistent state of data (via the oplog).
- * -- startup: Start data replication from existing data.
+ * -- startup: Start initial sync.
*/
class DataReplicator {
+ MONGO_DISALLOW_COPYING(DataReplicator);
+
public:
+ /**
+ * Callback function to report last applied optime (with hash) of initial sync.
+ */
+ typedef stdx::function<void(const StatusWith<OpTimeWithHash>& lastApplied)> OnCompletionFn;
+
+ /**
+ * RAII type that stores the result of a single initial sync attempt.
+ * Only the first result passed to setResultAndCancelRemainingWork_inlock() is saved.
+ * Calls '_onCompletion' on destruction with result.
+ * We use an invariant to ensure that a result has been provided by the caller at destruction.
+ */
+ class OnCompletionGuard {
+ MONGO_DISALLOW_COPYING(OnCompletionGuard);
+
+ public:
+ // Function to invoke DataReplicator::_cancelRemainingWork_inlock().
+ using CancelRemainingWorkInLockFn = stdx::function<void()>;
+
+ OnCompletionGuard(const CancelRemainingWorkInLockFn& cancelRemainingWorkInLock,
+ const OnCompletionFn& onCompletion);
+ ~OnCompletionGuard();
+
+ /**
+ * Sets result if called for the first time.
+ * Cancels remaining work in DataReplicator.
+ * Requires either a unique_lock or lock_guard to be passed in to ensure that we call
+ * DataReplicator::_cancelRemainingWork_inlock()) while we have a lock on the data
+ * replicator's mutex.
+ */
+ void setResultAndCancelRemainingWork_inlock(const std::lock_guard<std::mutex>& lock,
+ const StatusWith<OpTimeWithHash>& lastApplied);
+ void setResultAndCancelRemainingWork_inlock(const std::unique_lock<std::mutex>& lock,
+ const StatusWith<OpTimeWithHash>& lastApplied);
+
+ private:
+ /**
+ * Once we verified that we have the data replicator lock, this function is called by both
+ * versions of setResultAndCancelRemainingWork_inlock() to set the result and cancel any
+ * remaining work in the data replicator.
+ */
+ void _setResultAndCancelRemainingWork_inlock(const StatusWith<OpTimeWithHash>& lastApplied);
+
+ const CancelRemainingWorkInLockFn _cancelRemainingWorkInLock;
+ const OnCompletionFn _onCompletion;
+
+ // _lastAppliedSet and _lastApplied are guarded by the mutex of the DataReplicator instance
+ // that owns this guard object.
+ bool _lastAppliedSet = false;
+ StatusWith<OpTimeWithHash> _lastApplied =
+ Status(ErrorCodes::InternalError,
+ "This initial sync attempt finished without an explicit result.");
+ };
+
struct InitialSyncAttemptInfo {
int durationMillis;
Status status;
@@ -172,8 +213,8 @@ public:
};
struct Stats {
- size_t failedInitialSyncAttempts{0};
- size_t maxFailedInitialSyncAttempts{0};
+ std::uint32_t failedInitialSyncAttempts{0};
+ std::uint32_t maxFailedInitialSyncAttempts{0};
Date_t initialSyncStart;
Date_t initialSyncEnd;
std::vector<DataReplicator::InitialSyncAttemptInfo> initialSyncAttemptInfos;
@@ -185,42 +226,34 @@ public:
DataReplicator(DataReplicatorOptions opts,
std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState,
- StorageInterface* storage);
+ StorageInterface* storage,
+ const OnCompletionFn& onCompletion);
virtual ~DataReplicator();
- // Shuts down replication if "start" has been called, and blocks until shutdown has completed.
- Status shutdown();
-
/**
- * Cancels outstanding work and begins shutting down.
+ * Returns true if an initial sync is currently running or in the process of shutting down.
*/
- Status scheduleShutdown();
+ bool isActive() const;
/**
- * Waits for data replicator to finish shutting down.
- * Data replicator will go into uninitialized state.
+ * Starts initial sync process, with the provided number of attempts
*/
- void waitForShutdown();
+ Status startup(OperationContext* txn, std::uint32_t maxAttempts) noexcept;
/**
- * Does an initial sync, with the provided number of attempts.
- *
- * This should be the first method called after construction (see class comment).
+ * Shuts down replication if "start" has been called, and blocks until shutdown has completed.
*/
- StatusWith<OpTimeWithHash> doInitialSync(OperationContext* txn, std::size_t maxAttempts);
-
- DataReplicatorState getState() const;
-
- HostAndPort getSyncSource() const;
- OpTimeWithHash getLastFetched() const;
- OpTimeWithHash getLastApplied() const;
+ Status shutdown();
/**
- * Number of operations in the oplog buffer.
+ * Block until inactive.
*/
- size_t getOplogBufferCount() const;
+ void join();
+ /**
+ * Returns internal state in a loggable format.
+ */
std::string getDiagnosticString() const;
/**
@@ -229,10 +262,6 @@ public:
*/
BSONObj getInitialSyncProgress() const;
- // For testing only
-
- void _resetState_inlock(OperationContext* txn, OpTimeWithHash lastAppliedOpTime);
-
/**
* Overrides how executor schedules database work.
*
@@ -240,14 +269,239 @@ public:
*/
void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
+ // State transitions:
+ // PreStart --> Running --> ShuttingDown --> Complete
+ // It is possible to skip intermediate states. For example, calling shutdown() when the data
+ // replicator has not started will transition from PreStart directly to Complete.
+ enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
+
+ /**
+ * Returns current data replicator state.
+ * For testing only.
+ */
+ State getState_forTest() const;
+
private:
- // Runs a single initial sync attempt.
- Status _runInitialSyncAttempt_inlock(OperationContext* txn,
- UniqueLock& lk,
- HostAndPort syncSource);
+ /**
+ * Returns true if we are still processing initial sync tasks (_state is either Running or
+ * Shutdown).
+ */
+ bool _isActive_inlock() const;
+
+ /**
+ * Cancels all outstanding work.
+ * Used by shutdown() and CompletionGuard::setResultAndCancelRemainingWork().
+ */
+ void _cancelRemainingWork_inlock();
+
+ /**
+ * Returns true if the data replicator has received a shutdown request (_state is ShuttingDown).
+ */
+ bool _isShuttingDown() const;
+ bool _isShuttingDown_inlock() const;
+
+ /**
+ * Initial sync flowchart:
+ *
+ * start()
+ * |
+ * |
+ * V
+ * _setUp_inlock()
+ * |
+ * |
+ * V
+ * _startInitialSyncAttemptCallback()
+ * |
+ * |
+ * |<-------+
+ * | |
+ * | | (bad sync source)
+ * | |
+ * V |
+ * _chooseSyncSourceCallback()
+ * |
+ * |
+ * | (good sync source found)
+ * |
+ * |
+ * V
+ * _recreateOplogAndDropReplicatedDatabases()
+ * |
+ * |
+ * V
+ * _rollbackCheckerResetCallback()
+ * |
+ * |
+ * V
+ * _lastOplogEntryFetcherCallbackForBeginTimestamp()
+ * |
+ * |
+ * +------------------------------+
+ * | |
+ * | |
+ * V V
+ * _oplogFetcherCallback() _databasesClonerCallback
+ * | |
+ * | |
+ * | V
+ * | _lastOplogEntryFetcherCallbackForStopTimestamp()
+ * | | |
+ * | | |
+ * | (no ops to apply) | | (have ops to apply)
+ * | | |
+ * | | V
+ * | | _getNextApplierBatchCallback()<-----+
+ * | | | ^ |
+ * | | | | |
+ * | | | (no docs fetched | |
+ * | | | and end ts not | |
+ * | | | reached) | |
+ * | | | | |
+ * | | V | |
+ * | | _multiApplierCallback()-----+ |
+ * | | | | |
+ * | | | | |
+ * | | | | (docs fetched) | (end ts not
+ * | | | | | reached)
+ * | | | V |
+ * | | | _lastOplogEntryFetcherCallbackAfter-
+ * | | | FetchingMissingDocuments()
+ * | | | |
+ * | | | |
+ * | (reached end timestamp)
+ * | | | |
+ * | V V V
+ * | _rollbackCheckerCheckForRollbackCallback()
+ * | |
+ * | |
+ * +------------------------------+
+ * |
+ * |
+ * V
+ * _finishInitialSyncAttempt()
+ * |
+ * |
+ * V
+ * _finishCallback()
+ */
+
+ /**
+ * Sets up internal state to begin initial sync.
+ */
+ void _setUp_inlock(OperationContext* txn, std::uint32_t initialSyncMaxAttempts);
- void _setState(const DataReplicatorState& newState);
- void _setState_inlock(const DataReplicatorState& newState);
+ /**
+ * Tears down internal state before reporting final status to caller.
+ */
+ void _tearDown_inlock(OperationContext* txn, const StatusWith<OpTimeWithHash>& lastApplied);
+
+ /**
+ * Callback to start a single initial sync attempt.
+ */
+ void _startInitialSyncAttemptCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs,
+ std::uint32_t initialSyncAttempt,
+ std::uint32_t initialSyncMaxAttempts);
+
+ /**
+ * Callback to obtain sync source from sync source selector.
+ * For every initial sync attempt, we will try up to 'numInitialSyncConnectAttempts' times (at
+ * an interval of '_opts.syncSourceRetryWait' ms) to obtain a valid sync source before giving up
+ * and returning ErrorCodes::InitialSyncOplogSourceMissing.
+ */
+ void _chooseSyncSourceCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs,
+ std::uint32_t chooseSyncSourceAttempt,
+ std::uint32_t chooseSyncSourceMaxAttempts,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * This function does the following:
+ * 1.) Drop oplog.
+ * 2.) Drop user databases (replicated dbs).
+ * 3.) Create oplog.
+ */
+ Status _recreateOplogAndDropReplicatedDatabases();
+
+ /**
+ * Callback for rollback checker's first replSetGetRBID command before starting data cloning.
+ */
+ void _rollbackCheckerResetCallback(const RollbackChecker::Result& result,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Callback for first '_lastOplogEntryFetcher' callback. A successful response lets us
+ * determine the starting point for tailing the oplog using the OplogFetcher as well as
+ * setting a reference point for the state of the sync source's oplog when data cloning
+ * completes.
+ */
+ void _lastOplogEntryFetcherCallbackForBeginTimestamp(
+ const StatusWith<Fetcher::QueryResponse>& result,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Callback for oplog fetcher.
+ */
+ void _oplogFetcherCallback(const Status& status,
+ const OpTimeWithHash& lastFetched,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Callback for DatabasesCloner.
+ */
+ void _databasesClonerCallback(const Status& status,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Callback for second '_lastOplogEntryFetcher' callback. This is scheduled to obtain the stop
+ * timestamp after DatabasesCloner has completed and enables us to determine if the oplog on
+ * the sync source has advanced since we started cloning the databases.
+ */
+ void _lastOplogEntryFetcherCallbackForStopTimestamp(
+ const StatusWith<Fetcher::QueryResponse>& result,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Callback to obtain next batch of operations to apply.
+ */
+ void _getNextApplierBatchCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Callback for MultiApplier completion.
+ */
+ void _multiApplierCallback(const Status& status,
+ OpTimeWithHash lastApplied,
+ std::uint32_t numApplied,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Callback for third '_lastOplogEntryFetcher' callback. This is scheduled after MultiApplier
+ * completed successfully and missing documents were fetched from the sync source while
+ * DataReplicatorExternalState::_multiInitialSyncApply() was processing operations.
+ * This callback will update InitialSyncState::stopTimestamp on success.
+ */
+ void _lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments(
+ const StatusWith<Fetcher::QueryResponse>& result,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Callback for rollback checker's last replSetGetRBID command after cloning data and applying
+ * operations.
+ */
+ void _rollbackCheckerCheckForRollbackCallback(
+ const RollbackChecker::Result& result,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Reports result of current initial sync attempt. May schedule another initial sync attempt
+ * depending on shutdown state and whether we've exhausted all initial sync retries.
+ */
+ void _finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& lastApplied);
+
+ /**
+ * Invokes completion callback and transitions state to State::kComplete.
+ */
+ void _finishCallback(StatusWith<OpTimeWithHash> lastApplied);
// Obtains a valid sync source from the sync source selector.
// Returns error if a sync source cannot be found.
@@ -260,32 +514,81 @@ private:
void _enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
const OplogFetcher::DocumentsInfo& info);
- void _onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched);
- void _doNextActions();
- void _doNextActions_inlock();
BSONObj _getInitialSyncProgress_inlock() const;
StatusWith<Operations> _getNextApplierBatch_inlock();
- void _onApplyBatchFinish(const Status& status,
- OpTimeWithHash lastApplied,
- std::size_t numApplied);
-
- // Called when the DatabasesCloner finishes.
- void _onDataClonerFinish(const Status& status, HostAndPort syncSource);
- // Called after _onDataClonerFinish when the new Timestamp is avail, to use for minvalid.
- void _onApplierReadyStart(const QueryResponseStatus& fetchResult);
- // Called during _onApplyBatchFinish when we fetched a missing document and must reset minValid.
- void _onFetchMissingDocument_inlock(OpTimeWithHash lastApplied, std::size_t numApplied);
- // Schedules a fetcher to get the last oplog entry from the sync source.
- void _scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback);
-
- Status _scheduleDoNextActions();
- Status _scheduleApplyBatch_inlock();
-
- void _cancelAllHandles_inlock();
- void _waitOnAndResetAll_inlock(UniqueLock* lk);
- bool _anyActiveHandles_inlock() const;
+
+ /**
+ * Schedules a fetcher to get the last oplog entry from the sync source.
+ */
+ Status _scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback);
+
+ /**
+ * Checks the current oplog application progress (begin and end timestamps).
+ * If necessary, schedules a _getNextApplierBatchCallback() task.
+ * If the stop and end timestamps are inconsistent or if there is an issue scheduling the task,
+ * we set the error status in 'onCompletionGuard' and shut down the OplogFetcher.
+ * Passes 'lock' through to completion guard.
+ */
+ void _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
+ const std::lock_guard<std::mutex>& lock,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Schedules a rollback checker to get the rollback ID after data cloning or applying. This
+ * helps us check if a rollback occurred on the sync source.
+ * If we fail to schedule the rollback checker, we set the error status in 'onCompletionGuard'
+ * and shut down the OplogFetcher.
+ * Passes 'lock' through to completion guard.
+ */
+ void _scheduleRollbackCheckerCheckForRollback_inlock(
+ const std::lock_guard<std::mutex>& lock,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Checks the given status (or embedded status inside the callback args) and current data
+ * replicator shutdown state. If the given status is not OK or if we are shutting down, returns
+ * a new error status that should be passed to _finishCallback. The reason in the new error
+ * status will include 'message'.
+ * Otherwise, returns Status::OK().
+ */
+ Status _checkForShutdownAndConvertStatus_inlock(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message);
+ Status _checkForShutdownAndConvertStatus_inlock(const Status& status,
+ const std::string& message);
+
+ /**
+ * Schedules work to be run by the task executor.
+ * Saves handle if work was successfully scheduled.
+ * Returns scheduleWork status (without the handle).
+ */
+ Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name);
+ Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when,
+ const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name);
+
+ /**
+ * Cancels task executor callback handle if not null.
+ */
+ void _cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle);
+
+ /**
+ * Starts up component and checks data replicator's shutdown state at the same time.
+ * If component's startup() fails, resets 'component' (which is assumed to be a unique_ptr
+ * to the component type).
+ */
+ template <typename Component>
+ Status _startupComponent_inlock(Component& component);
+
+ /**
+ * Shuts down component if not null.
+ */
+ template <typename Component>
+ void _shutdownComponent_inlock(Component& component);
// Counts how many documents have been refetched from the source in the current batch.
AtomicUInt32 _fetchCount;
@@ -305,34 +608,54 @@ private:
const DataReplicatorOptions _opts; // (R)
std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (R)
executor::TaskExecutor* _exec; // (R)
- DataReplicatorState _dataReplicatorState; // (MX)
- std::unique_ptr<InitialSyncState> _initialSyncState; // (M)
- StorageInterface* _storage; // (M)
- std::unique_ptr<OplogFetcher> _oplogFetcher; // (S)
- std::unique_ptr<Fetcher> _lastOplogEntryFetcher; // (S)
- bool _applierPaused = false; // (X)
- std::unique_ptr<MultiApplier> _applier; // (M)
- std::unique_ptr<MultiApplier> _shuttingDownApplier; // (M)
- HostAndPort _syncSource; // (M)
- OpTimeWithHash _lastFetched; // (MX)
- OpTimeWithHash _lastApplied; // (MX)
- std::unique_ptr<OplogBuffer> _oplogBuffer; // (M)
-
- // Set to true when shutdown is requested. This flag should be checked by
- // the data replicator during initial sync so that it can interrupt the
- // the current operation and gracefully transition to completion with
- // a shutdown status.
- bool _inShutdown = false; // (M)
- // Set to true when the _onShutdown event is signaled for the first time.
- // Ensures that we do not signal the shutdown event more than once (which
- // is disallowed by the task executor.
- bool _onShutdownSignaled = false; // (M)
- // Created when shutdown is requested. Signaled at most once when the data
- // replicator is determining its next steps between task executor callbacks.
- Event _onShutdown; // (M)
+ StorageInterface* _storage; // (R)
+
+ // This is invoked with the final status of the initial sync. If startup() fails, this callback
+ // is never invoked. The caller gets the last applied optime with hash when the initial sync
+ // completes successfully or an error status.
+ // '_onCompletion' is cleared on completion (in _finishCallback()) in order to release any
+ // resources that might be held by the callback function object.
+ OnCompletionFn _onCompletion; // (M)
+
+ // Handle to currently scheduled _startInitialSyncAttemptCallback() task.
+ executor::TaskExecutor::CallbackHandle _startInitialSyncAttemptHandle; // (M)
+
+ // Handle to currently scheduled _chooseSyncSourceCallback() task.
+ executor::TaskExecutor::CallbackHandle _chooseSyncSourceHandle; // (M)
+
+ // RollbackChecker to get rollback ID before and after each initial sync attempt.
+ std::unique_ptr<RollbackChecker> _rollbackChecker; // (M)
+ // Handle returned from RollbackChecker::reset().
+ RollbackChecker::CallbackHandle _getBaseRollbackIdHandle; // (M)
+
+ // Handle returned from RollbackChecker::checkForRollback().
+ RollbackChecker::CallbackHandle _getLastRollbackIdHandle; // (M)
+
+ // Handle to currently scheduled _getNextApplierBatchCallback() task.
+ executor::TaskExecutor::CallbackHandle _getNextApplierBatchHandle; // (M)
+
+ std::unique_ptr<InitialSyncState> _initialSyncState; // (M)
+ std::unique_ptr<OplogFetcher> _oplogFetcher; // (S)
+ std::unique_ptr<Fetcher> _lastOplogEntryFetcher; // (S)
+ std::unique_ptr<MultiApplier> _applier; // (M)
+ HostAndPort _syncSource; // (M)
+ OpTimeWithHash _lastFetched; // (MX)
+ OpTimeWithHash _lastApplied; // (MX)
+ std::unique_ptr<OplogBuffer> _oplogBuffer; // (M)
+
+ // Used to signal changes in _state.
+ mutable stdx::condition_variable _stateCondition;
+
+ // Current data replicator state. See comments for State enum class for details.
+ State _state = State::kPreStart; // (M)
+
+ // Passed to CollectionCloner via DatabasesCloner.
CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
- Stats _stats; // (M)
+
+ // Contains stats on the current initial sync request (includes all attempts).
+ // To access these stats in a user-readable format, use getInitialSyncProgress().
+ Stats _stats; // (M)
};
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index aad8fd0843e..8e6c521c71d 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -76,7 +76,7 @@ std::unique_ptr<OplogBuffer> DataReplicatorExternalStateMock::makeSteadyStateOpl
}
StatusWith<ReplicaSetConfig> DataReplicatorExternalStateMock::getCurrentConfig() const {
- return replSetConfig;
+ return replSetConfigResult;
}
StatusWith<OpTime> DataReplicatorExternalStateMock::_multiApply(
@@ -93,7 +93,8 @@ Status DataReplicatorExternalStateMock::_multiSyncApply(MultiApplier::OperationP
Status DataReplicatorExternalStateMock::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
const HostAndPort& source,
AtomicUInt32* fetchCount) {
- return Status::OK();
+
+ return multiInitialSyncApplyFn(ops, source, fetchCount);
}
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index b85332bb750..45b755d525a 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -84,7 +84,13 @@ public:
// Override to change multiApply behavior.
MultiApplier::MultiApplyFn multiApplyFn;
- ReplicaSetConfig replSetConfig;
+ // Override to change _multiInitialSyncApply behavior.
+ using MultiInitialSyncApplyFn = stdx::function<Status(
+ MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount)>;
+ MultiInitialSyncApplyFn multiInitialSyncApplyFn = [](
+ MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32*) { return Status::OK(); };
+
+ StatusWith<ReplicaSetConfig> replSetConfigResult = ReplicaSetConfig();
private:
StatusWith<OpTime> _multiApply(OperationContext* txn,
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 821a3a12a42..524c95125b7 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -28,7 +28,9 @@
#include "mongo/platform/basic.h"
+#include <iosfwd>
#include <memory>
+#include <ostream>
#include "mongo/client/fetcher.h"
#include "mongo/db/client.h"
@@ -38,6 +40,7 @@
#include "mongo/db/repl/data_replicator.h"
#include "mongo/db/repl/data_replicator_external_state_mock.h"
#include "mongo/db/repl/member_state.h"
+#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/reporter.h"
@@ -60,6 +63,30 @@
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
+namespace mongo {
+namespace repl {
+
+/**
+ * Insertion operator for DataReplicator::State. Formats data replicator state for output stream.
+ */
+std::ostream& operator<<(std::ostream& os, const DataReplicator::State& state) {
+ switch (state) {
+ case DataReplicator::State::kPreStart:
+ return os << "PreStart";
+ case DataReplicator::State::kRunning:
+ return os << "Running";
+ case DataReplicator::State::kShuttingDown:
+ return os << "ShuttingDown";
+ case DataReplicator::State::kComplete:
+ return os << "Complete";
+ }
+ MONGO_UNREACHABLE;
+}
+
+} // namespace repl
+} // namespace mongo
+
+
namespace {
using namespace mongo;
@@ -88,6 +115,13 @@ public:
TaskExecutorMock(executor::TaskExecutor* executor, ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
+ StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) {
+ if (shouldFailScheduleWork) {
+ return Status(ErrorCodes::OperationFailed, "failed to schedule work");
+ }
+ return getExecutor()->scheduleWork(work);
+ }
+
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) {
if (shouldFailScheduleWorkAt) {
return Status(ErrorCodes::OperationFailed,
@@ -104,6 +138,7 @@ public:
return getExecutor()->scheduleRemoteCommand(request, cb);
}
+ bool shouldFailScheduleWork = false;
bool shouldFailScheduleWorkAt = false;
private:
@@ -181,6 +216,12 @@ public:
finishProcessingNetworkResponse();
}
+ /**
+ * Schedules and processes a successful response to the network request sent by DataReplicator's
+ * last oplog entry fetcher. Also validates the find command arguments in the request.
+ */
+ void processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs);
+
void finishProcessingNetworkResponse() {
getNet()->runReadyNetworkOperations();
if (getNet()->hasReadyRequests()) {
@@ -285,12 +326,14 @@ protected:
_myLastOpTime = OpTime({3, 0}, 1);
DataReplicatorOptions options;
- options.initialSyncRetryWait = Milliseconds(0);
+ options.initialSyncRetryWait = Milliseconds(1);
options.getMyLastOptime = [this]() { return _myLastOpTime; };
options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); };
options.getSlaveDelay = [this]() { return Seconds(0); };
options.syncSourceSelector = this;
+ _options = options;
+
ThreadPool::Options threadPoolOptions;
threadPoolOptions.poolName = "replication";
threadPoolOptions.minThreads = 1U;
@@ -317,14 +360,26 @@ protected:
<< "localhost:12345"))
<< "settings"
<< BSON("electionTimeoutMillis" << 10000))));
- dataReplicatorExternalState->replSetConfig = config;
+ dataReplicatorExternalState->replSetConfigResult = config;
}
_externalState = dataReplicatorExternalState.get();
+ _lastApplied = getDetectableErrorStatus();
+ _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) {
+ _lastApplied = lastApplied;
+ };
try {
- _dr.reset(new DataReplicator(
- options, std::move(dataReplicatorExternalState), _storageInterface.get()));
+ // When creating DataReplicator, we wrap _onCompletion so that we can override the
+ // DataReplicator's callback behavior post-construction.
+ // See DataReplicatorTransitionsToCompleteWhenFinishCallbackThrowsException.
+ _dr = stdx::make_unique<DataReplicator>(
+ options,
+ std::move(dataReplicatorExternalState),
+ _storageInterface.get(),
+ [this](const StatusWith<OpTimeWithHash>& lastApplied) {
+ _onCompletion(lastApplied);
+ });
_dr->setScheduleDbWorkFn_forTest(
[this](const executor::TaskExecutor::CallbackFn& work) {
return getExecutor().scheduleWork(work);
@@ -375,14 +430,18 @@ protected:
TaskExecutorMock::ShouldFailRequestFn _shouldFailRequest;
std::unique_ptr<TaskExecutorMock> _executorProxy;
+ DataReplicatorOptions _options;
DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime;
OpTime _myLastOpTime;
- std::unique_ptr<SyncSourceSelector> _syncSourceSelector;
+ std::unique_ptr<SyncSourceSelectorMock> _syncSourceSelector;
std::unique_ptr<StorageInterfaceMock> _storageInterface;
std::unique_ptr<OldThreadPool> _dbWorkThreadPool;
std::map<NamespaceString, CollectionMockStats> _collectionStats;
std::map<NamespaceString, CollectionCloneInfo> _collections;
+ StatusWith<OpTimeWithHash> _lastApplied = Status(ErrorCodes::NotYetInitialized, "");
+ DataReplicator::OnCompletionFn _onCompletion;
+
private:
DataReplicatorExternalStateMock* _externalState;
std::unique_ptr<DataReplicator> _dr;
@@ -395,577 +454,1612 @@ executor::ThreadPoolMock::Options DataReplicatorTest::makeThreadPoolMockOptions(
return options;
}
+void advanceClock(NetworkInterfaceMock* net, Milliseconds duration) {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ auto when = net->now() + duration;
+ ASSERT_EQUALS(when, net->runUntil(when));
+}
+
ServiceContext::UniqueOperationContext makeOpCtx() {
return cc().makeOperationContext();
}
-TEST_F(DataReplicatorTest, CreateDestroy) {}
-
-// Used to run a Initial Sync in a separate thread, to avoid blocking test execution.
-class InitialSyncBackgroundRunner {
-public:
- InitialSyncBackgroundRunner(DataReplicator* dr, std::size_t maxAttempts)
- : _dr(dr), _maxAttempts(maxAttempts) {}
+/**
+ * Generates a replSetGetRBID response.
+ */
+BSONObj makeRollbackCheckerResponse(int rollbackId) {
+ return BSON("ok" << 1 << "rbid" << rollbackId);
+}
- ~InitialSyncBackgroundRunner() {
- if (_thread) {
- _thread->join();
+/**
+ * Generates a cursor response for a Fetcher to consume.
+ */
+BSONObj makeCursorResponse(CursorId cursorId,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ bool isFirstBatch = true) {
+ BSONObjBuilder bob;
+ {
+ BSONObjBuilder cursorBob(bob.subobjStart("cursor"));
+ cursorBob.append("id", cursorId);
+ cursorBob.append("ns", nss.toString());
+ {
+ BSONArrayBuilder batchBob(
+ cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch"));
+ for (const auto& doc : docs) {
+ batchBob.append(doc);
+ }
}
}
+ bob.append("ok", 1);
+ return bob.obj();
+}
- // Could block if initial sync has not finished.
- StatusWith<OpTimeWithHash> getResult(NetworkInterfaceMock* net) {
- while (!isDone()) {
- NetworkGuard guard(net);
- // if (net->hasReadyRequests()) {
- net->runReadyNetworkOperations();
- // }
+/**
+ * Generates a listDatabases response for a DatabasesCloner to consume.
+ */
+BSONObj makeListDatabasesResponse(std::vector<std::string> databaseNames) {
+ BSONObjBuilder bob;
+ {
+ BSONArrayBuilder databasesBob(bob.subarrayStart("databases"));
+ for (const auto& name : databaseNames) {
+ BSONObjBuilder nameBob(databasesBob.subobjStart());
+ nameBob.append("name", name);
}
- _thread->join();
- _thread.reset();
+ }
+ bob.append("ok", 1);
+ return bob.obj();
+}
+
+/**
+ * Generates oplog entries with the given number used for the timestamp.
+ */
+BSONObj makeOplogEntry(int t, const char* opType = "i", int version = OplogEntry::kOplogVersion) {
+ return BSON("ts" << Timestamp(t, 1) << "h" << static_cast<long long>(t) << "ns"
+ << "a.a"
+ << "v"
+ << version
+ << "op"
+ << opType
+ << "o"
+ << BSON("_id" << t << "a" << t));
+}
- LockGuard lk(_mutex);
- return _result;
+void DataReplicatorTest::processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs) {
+ auto net = getNet();
+ auto request = assertRemoteCommandNameEquals(
+ "find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(0LL, _options.localOplogNS, docs)));
+ ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
+ ASSERT_TRUE(request.cmdObj.hasField("sort"));
+ ASSERT_EQUALS(mongo::BSONType::Object, request.cmdObj["sort"].type());
+ ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort"));
+ net->runReadyNetworkOperations();
+}
+
+TEST_F(DataReplicatorTest, InvalidConstruction) {
+ DataReplicatorOptions options;
+ options.getMyLastOptime = []() { return OpTime(); };
+ options.setMyLastOptime = [](const OpTime&) {};
+ options.getSlaveDelay = []() { return Seconds(0); };
+ options.syncSourceSelector = this;
+ auto callback = [](const StatusWith<OpTimeWithHash>&) {};
+
+ // Null task executor in external state.
+ {
+ auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
+ ASSERT_THROWS_CODE_AND_WHAT(
+ DataReplicator(
+ options, std::move(dataReplicatorExternalState), _storageInterface.get(), callback),
+ UserException,
+ ErrorCodes::BadValue,
+ "task executor cannot be null");
}
- bool isDone() {
- LockGuard lk(_mutex);
- return (_result.getStatus().code() != ErrorCodes::NotYetInitialized);
+ // Null callback function.
+ {
+ auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
+ dataReplicatorExternalState->taskExecutor = &getExecutor();
+ ASSERT_THROWS_CODE_AND_WHAT(DataReplicator(options,
+ std::move(dataReplicatorExternalState),
+ _storageInterface.get(),
+ DataReplicator::OnCompletionFn()),
+ UserException,
+ ErrorCodes::BadValue,
+ "callback function cannot be null");
}
+}
+
+TEST_F(DataReplicatorTest, CreateDestroy) {}
+
+const std::uint32_t maxAttempts = 1U;
+
+TEST_F(DataReplicatorTest, StartupReturnsIllegalOperationIfAlreadyActive) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+ ASSERT_FALSE(dr->isActive());
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+ ASSERT_TRUE(dr->isActive());
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, dr->startup(txn.get(), maxAttempts));
+ ASSERT_TRUE(dr->isActive());
+}
+
+TEST_F(DataReplicatorTest, StartupReturnsShutdownInProgressIfDataReplicatorIsShuttingDown) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+ ASSERT_FALSE(dr->isActive());
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+ ASSERT_TRUE(dr->isActive());
+ ASSERT_OK(dr->shutdown());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(txn.get(), maxAttempts));
+}
+
+TEST_F(DataReplicatorTest, StartupReturnsShutdownInProgressIfExecutorIsShutdown) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+ getExecutor().shutdown();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(txn.get(), maxAttempts));
+ ASSERT_FALSE(dr->isActive());
+
+ // Cannot startup data replicator again since it's in the Complete state.
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(txn.get(), maxAttempts));
+}
- bool isActive() {
- return (_dr && _dr->getState() == DataReplicatorState::InitialSync) && !isDone();
+TEST_F(DataReplicatorTest, ShutdownTransitionsStateToCompleteIfCalledBeforeStartup) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+ ASSERT_OK(dr->shutdown());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(txn.get(), maxAttempts));
+ // Data replicator is inactive when it's in the Complete state.
+ ASSERT_FALSE(dr->isActive());
+}
+
+TEST_F(DataReplicatorTest, StartupSetsInitialSyncFlagOnSuccess) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ // Initial sync flag should not be set before starting.
+ ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get()));
+
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+ ASSERT_TRUE(dr->isActive());
+
+ // Initial sync flag should be set.
+ ASSERT_TRUE(getStorage().getInitialSyncFlag(txn.get()));
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ // This will cancel the _startInitialSyncAttemptCallback() task scheduled by startup().
+ ASSERT_OK(dr->shutdown());
+
+ dr->join();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
+
+const std::uint32_t chooseSyncSourceMaxAttempts = 10U;
+
+/**
+ * Advances executor clock so that DataReplicator exhausts all 'chooseSyncSourceMaxAttempts' (server
+ * parameter numInitialSyncConnectAttempts) sync source selection attempts.
+ * If SyncSourceSelectorMock keeps returning an invalid sync source, DataReplicator will retry every
+ * '_options.syncSourceRetryWait' ms up to a maximum of 'chooseSyncSourceMaxAttempts' attempts.
+ */
+void _simulateChooseSyncSourceFailure(executor::NetworkInterfaceMock* net,
+ Milliseconds syncSourceRetryWait) {
+ advanceClock(net, int(chooseSyncSourceMaxAttempts - 1) * syncSourceRetryWait);
+}
+
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorReturnsInitialSyncOplogSourceMissingIfNoValidSyncSourceCanBeFoundAfterTenFailedChooseSyncSourceAttempts) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ // Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup()
+ // because DataReplicator will look for a valid sync source immediately after startup.
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
+
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ _simulateChooseSyncSourceFailure(getNet(), _options.syncSourceRetryWait);
+
+ dr->join();
+
+ ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
+}
+
+// Confirms that DataReplicator keeps retrying initial sync.
+// Make every initial sync attempt fail early by having the sync source selector always return an
+// invalid sync source.
+TEST_F(DataReplicatorTest,
+ DataReplicatorRetriesInitialSyncUpToMaxAttemptsAndReturnsLastAttemptError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
+
+ const std::uint32_t initialSyncMaxAttempts = 3U;
+ ASSERT_OK(dr->startup(txn.get(), initialSyncMaxAttempts));
+
+ auto net = getNet();
+ for (std::uint32_t i = 0; i < initialSyncMaxAttempts; ++i) {
+ _simulateChooseSyncSourceFailure(net, _options.syncSourceRetryWait);
+ advanceClock(net, _options.initialSyncRetryWait);
}
- void run() {
- UniqueLock lk(_mutex);
- _thread.reset(new stdx::thread(stdx::bind(&InitialSyncBackgroundRunner::_run, this)));
- _condVar.wait(lk);
+ dr->join();
+
+ ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
+
+ // Check number of failed attempts in stats.
+ auto progress = dr->getInitialSyncProgress();
+ unittest::log() << "Progress after " << initialSyncMaxAttempts
+ << " failed attempts: " << progress;
+ ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), int(initialSyncMaxAttempts))
+ << progress;
+ ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), int(initialSyncMaxAttempts))
+ << progress;
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsCallbackCanceledIfShutdownWhileRetryingSyncSourceSelection) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ auto when = net->now() + _options.syncSourceRetryWait / 2;
+ ASSERT_GREATER_THAN(when, net->now());
+ ASSERT_EQUALS(when, net->runUntil(when));
}
- BSONObj getInitialSyncProgress() {
- return _dr->getInitialSyncProgress();
+ // This will cancel the _chooseSyncSourceCallback() task scheduled at getNet()->now() +
+ // '_options.syncSourceRetryWait'.
+ ASSERT_OK(dr->shutdown());
+
+ dr->join();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
+
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextChooseSyncSourceCallback) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
+ _executorProxy->shouldFailScheduleWorkAt = true;
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ dr->join();
+
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextInitialSyncAttempt) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
+
+ ASSERT_EQUALS(DataReplicator::State::kPreStart, dr->getState_forTest());
+
+ ASSERT_OK(dr->startup(txn.get(), 2U));
+ ASSERT_EQUALS(DataReplicator::State::kRunning, dr->getState_forTest());
+
+ // Advance clock so that we run all but the last sync source callback.
+ auto net = getNet();
+ advanceClock(net, int(chooseSyncSourceMaxAttempts - 2) * _options.syncSourceRetryWait);
+
+ // Last choose sync source attempt should now be scheduled. Advance clock so we fail last
+ // choose sync source attempt which cause the next initial sync attempt to be scheduled.
+ _executorProxy->shouldFailScheduleWorkAt = true;
+ advanceClock(net, _options.syncSourceRetryWait);
+
+ dr->join();
+
+ ASSERT_EQUALS(DataReplicator::State::kComplete, dr->getState_forTest());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+// This test verifies that the data replication will still transition to a complete state even if
+// the completion callback function throws an exception.
+TEST_F(DataReplicatorTest, DataReplicatorTransitionsToCompleteWhenFinishCallbackThrowsException) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) {
+ _lastApplied = lastApplied;
+ uassert(ErrorCodes::InternalError, "", false);
+ };
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ ASSERT_OK(dr->shutdown());
+ dr->join();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
+
+class SharedCallbackState {
+ MONGO_DISALLOW_COPYING(SharedCallbackState);
+
+public:
+ explicit SharedCallbackState(bool* sharedCallbackStateDestroyed)
+ : _sharedCallbackStateDestroyed(sharedCallbackStateDestroyed) {}
+ ~SharedCallbackState() {
+ *_sharedCallbackStateDestroyed = true;
}
private:
- void _run() {
- setThreadName("InitialSyncRunner");
- Client::initThreadIfNotAlready();
- auto txn = getGlobalServiceContext()->makeOperationContext(&cc());
+ bool* _sharedCallbackStateDestroyed;
+};
+
+TEST_F(DataReplicatorTest, DataReplicatorResetsOnCompletionCallbackFunctionPointerUponCompletion) {
+ bool sharedCallbackStateDestroyed = false;
+ auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed);
+ decltype(_lastApplied) lastApplied = getDetectableErrorStatus();
+
+ auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
+ dataReplicatorExternalState->taskExecutor = &getExecutor();
+ auto dr = stdx::make_unique<DataReplicator>(
+ _options,
+ std::move(dataReplicatorExternalState),
+ _storageInterface.get(),
+ [&lastApplied, sharedCallbackData](const StatusWith<OpTimeWithHash>& result) {
+ lastApplied = result;
+ });
+ ON_BLOCK_EXIT([this]() { getExecutor().shutdown(); });
+
+ auto txn = makeOpCtx();
+
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ sharedCallbackData.reset();
+ ASSERT_FALSE(sharedCallbackStateDestroyed);
+
+ ASSERT_OK(dr->shutdown());
+ dr->join();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, lastApplied);
+
+ // DataReplicator should reset 'DataReplicator::_onCompletion' after running callback function
+ // for the last time before becoming inactive.
+ // This ensures that we release resources associated with 'DataReplicator::_onCompletion'.
+ ASSERT_TRUE(sharedCallbackStateDestroyed);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorRecreatesOplogAndDropsReplicatedDatabases) {
+ // We are not interested in proceeding beyond the oplog creation stage so we inject a failure
+ // after setting '_storageInterfaceWorkDone.createOplogCalled' to true.
+ auto oldCreateOplogFn = _storageInterface->createOplogFn;
+ _storageInterface->createOplogFn = [oldCreateOplogFn](OperationContext* txn,
+ const NamespaceString& nss) {
+ oldCreateOplogFn(txn, nss);
+ return Status(ErrorCodes::OperationFailed, "oplog creation failed");
+ };
+
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+
+ LockGuard lock(_storageInterfaceWorkDoneMutex);
+ ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
+ ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetRollbackIdScheduleError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ // replSetGetRBID is the first remote command to be scheduled by the data replicator after
+ // creating the oplog collection.
+ executor::RemoteCommandRequest request;
+ _shouldFailRequest = [&request](const executor::RemoteCommandRequest& requestToSend) {
+ request = requestToSend;
+ return true;
+ };
+
+ HostAndPort syncSource("localhost", 12345);
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+
+ ASSERT_EQUALS("admin", request.dbname);
+ assertRemoteCommandNameEquals("replSetGetRBID", request);
+ ASSERT_EQUALS(syncSource, request.target);
+}
+
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) {
+ // The rollback id request is sent immediately after oplog creation. We shut the task executor
+ // down before returning from createOplog() to make the scheduleRemoteCommand() call for
+ // replSetGetRBID fail.
+ auto oldCreateOplogFn = _storageInterface->createOplogFn;
+ _storageInterface->createOplogFn = [oldCreateOplogFn, this](OperationContext* txn,
+ const NamespaceString& nss) {
+ auto status = oldCreateOplogFn(txn, nss);
+ getExecutor().shutdown();
+ return status;
+ };
+
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied);
+
+ LockGuard lock(_storageInterfaceWorkDoneMutex);
+ ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorCancelsRollbackCheckerOnShutdown) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ HostAndPort syncSource("localhost", 12345);
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
- // Synchonize this thread starting with the call in run() above.
- UniqueLock lk(_mutex);
- _condVar.notify_all();
- lk.unlock();
+ ASSERT_EQUALS(DataReplicator::State::kPreStart, dr->getState_forTest());
- auto result = _dr->doInitialSync(txn.get(), _maxAttempts); // blocking
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+ ASSERT_EQUALS(DataReplicator::State::kRunning, dr->getState_forTest());
- lk.lock();
- _result = result;
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ const auto& request = assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest());
+ ASSERT_EQUALS("admin", request.dbname);
+ ASSERT_EQUALS(syncSource, request.target);
+ net->blackHole(noi);
}
- stdx::mutex _mutex; // protects _result.
- StatusWith<OpTimeWithHash> _result{ErrorCodes::NotYetInitialized, "InitialSync not started."};
+ ASSERT_OK(dr->shutdown());
+ // Since we need to request the NetworkInterfaceMock to deliver the cancellation event,
+ // the DataReplicator has to be in a pre-completion state (ie. ShuttingDown).
+ ASSERT_EQUALS(DataReplicator::State::kShuttingDown, dr->getState_forTest());
- DataReplicator* _dr;
- const std::size_t _maxAttempts;
- std::unique_ptr<stdx::thread> _thread;
- stdx::condition_variable _condVar;
-};
+ executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
-bool isOplogGetMore(const NetworkInterfaceMock::NetworkOperationIterator& noi) {
- const RemoteCommandRequest& req = noi->getRequest();
- const auto parsedGetMoreStatus = GetMoreRequest::parseFromBSON(req.dbname, req.cmdObj);
- if (!parsedGetMoreStatus.isOK()) {
- return false;
+ dr->join();
+ ASSERT_EQUALS(DataReplicator::State::kComplete, dr->getState_forTest());
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughRollbackCheckerCallbackError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ assertRemoteCommandNameEquals(
+ "replSetGetRBID",
+ net->scheduleErrorResponse(
+ Status(ErrorCodes::OperationFailed, "replSetGetRBID failed at sync source")));
+ net->runReadyNetworkOperations();
}
- const auto getMoreReq = parsedGetMoreStatus.getValue();
- return (getMoreReq.nss.isOplog() && getMoreReq.cursorid == 1LL);
-}
-
-// Should match this: { killCursors: "oplog.rs", cursors: [ 1 ] }
-bool isOplogKillCursor(const NetworkInterfaceMock::NetworkOperationIterator& noi) {
- const BSONObj reqBSON = noi->getRequest().cmdObj;
- const auto nsElem = reqBSON["killCursors"];
- const auto isOplogNS =
- nsElem && NamespaceString{"local.oplog.rs"}.coll().equalCaseInsensitive(nsElem.str());
- if (isOplogNS) {
- const auto cursorsVector = reqBSON["cursors"].Array();
- auto hasCursorId = false;
- std::for_each(
- cursorsVector.begin(), cursorsVector.end(), [&hasCursorId](const BSONElement& elem) {
- if (elem.safeNumberLong() == 1LL) {
- hasCursorId = true;
- }
- });
- return isOplogNS && hasCursorId;
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherScheduleError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ // The last oplog entry fetcher is the first component that sends a find command so we reject
+ // any find commands and save the request for inspection at the end of this test case.
+ executor::RemoteCommandRequest request;
+ _shouldFailRequest = [&request](const executor::RemoteCommandRequest& requestToSend) {
+ request = requestToSend;
+ return "find" == requestToSend.cmdObj.firstElement().fieldNameStringData();
+ };
+
+ HostAndPort syncSource("localhost", 12345);
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
}
- return false;
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+
+ ASSERT_EQUALS(syncSource, request.target);
+ ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname);
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort"));
+ ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
}
-class InitialSyncTest : public DataReplicatorTest {
-public:
- using Responses = std::vector<std::pair<std::string, BSONObj>>;
- InitialSyncTest(){};
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherCallbackError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
-protected:
- void setResponses(Responses resps) {
- _responses = resps;
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ assertRemoteCommandNameEquals(
+ "find",
+ net->scheduleErrorResponse(
+ Status(ErrorCodes::OperationFailed, "find command failed at sync source")));
+ net->runReadyNetworkOperations();
}
- void startSync(std::size_t maxAttempts) {
- DataReplicator* dr = &(getDR());
- _isbr.reset(new InitialSyncBackgroundRunner(dr, maxAttempts));
- _isbr->run();
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorCancelsLastOplogEntryFetcherOnShutdown) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ ASSERT_TRUE(net->hasReadyRequests());
}
- void playResponses() {
- NetworkInterfaceMock* net = getNet();
- int processedRequests(0);
- const int expectedResponses(_responses.size());
-
- Date_t lastLog{Date_t::now()};
- while (true) {
- if (_isbr && _isbr->isDone()) {
- log() << "There are " << (expectedResponses - processedRequests)
- << " responses left which were unprocessed.";
- return;
- }
+ ASSERT_OK(dr->shutdown());
+ executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
- NetworkGuard guard(net);
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
- if (!net->hasReadyRequests()) {
- net->runReadyNetworkOperations();
- continue;
- }
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsNoMatchingDocumentIfLastOplogEntryFetcherReturnsEmptyBatchOfDocuments) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
- auto noi = net->getNextReadyRequest();
- if (isOplogGetMore(noi)) {
- // process getmore requests from the oplog fetcher
- int c = int(numGetMoreOplogEntries + 2);
- lastGetMoreOplogEntry = BSON("ts" << Timestamp(Seconds(c), 1) << "h" << 1LL << "ns"
- << "test.a"
- << "v"
- << OplogEntry::kOplogVersion
- << "op"
- << "i"
- << "o"
- << BSON("_id" << c));
- ++numGetMoreOplogEntries;
- mongo::CursorId cursorId =
- numGetMoreOplogEntries == numGetMoreOplogEntriesMax ? 0 : 1LL;
- auto respBSON =
- BSON("ok" << 1 << "cursor" << BSON("id" << cursorId << "ns"
- << "local.oplog.rs"
- << "nextBatch"
- << BSON_ARRAY(lastGetMoreOplogEntry)));
- net->scheduleResponse(
- noi,
- net->now(),
- ResponseStatus(RemoteCommandResponse(respBSON, BSONObj(), Milliseconds(10))));
-
- log() << "Sending response for getMore network request:";
- log() << " req: " << noi->getRequest().dbname << "."
- << noi->getRequest().cmdObj;
- log() << " resp:" << respBSON;
-
- if ((Date_t::now() - lastLog) > Seconds(1)) {
- lastLog = Date_t::now();
- log() << "processing oplog getmore, net:" << net->getDiagnosticString();
- net->logQueues();
- }
- net->runReadyNetworkOperations();
- continue;
- } else if (isOplogKillCursor(noi)) {
- auto respBSON = BSON("ok" << 1.0);
- log() << "processing oplog killcursors req, net:" << net->getDiagnosticString();
- net->scheduleResponse(
- noi,
- net->now(),
- ResponseStatus(RemoteCommandResponse(respBSON, BSONObj(), Milliseconds(10))));
- net->runReadyNetworkOperations();
- continue;
- }
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
- const BSONObj reqBSON = noi->getRequest().cmdObj;
- const BSONElement cmdElem = reqBSON.firstElement();
- auto cmdName = cmdElem.fieldNameStringData();
- auto expectedName = _responses[processedRequests].first;
- auto response = _responses[processedRequests].second;
- ASSERT(_responses[processedRequests].first == "" ||
- cmdName.equalCaseInsensitive(expectedName))
- << "ERROR: response #" << processedRequests + 1 << ", expected '" << expectedName
- << "' command but the request was actually: '" << noi->getRequest().cmdObj
- << "' for resp: " << response;
-
- // process fixed set of responses
- log() << "Sending response for network request:";
- log() << " req: " << noi->getRequest().dbname << "." << noi->getRequest().cmdObj;
- log() << " resp:" << response;
- net->scheduleResponse(
- noi,
- net->now(),
- ResponseStatus(RemoteCommandResponse(response, BSONObj(), Milliseconds(10))));
-
- if ((Date_t::now() - lastLog) > Seconds(1)) {
- lastLog = Date_t();
- log() << net->getDiagnosticString();
- net->logQueues();
- }
- net->runReadyNetworkOperations();
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
- guard.dismiss();
- if (++processedRequests >= expectedResponses) {
- log() << "done processing expected requests ";
- break; // once we have processed all requests, continue;
- }
- }
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({});
}
- void verifySync(NetworkInterfaceMock* net, Status s = Status::OK()) {
- verifySync(net, s.code());
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({BSONObj()});
}
- void verifySync(NetworkInterfaceMock* net, ErrorCodes::Error code) {
- // Check result
- const auto status = _isbr->getResult(net).getStatus();
- ASSERT_EQ(status.code(), code) << "status codes differ, status: " << status;
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({BSON("h" << 1LL)});
}
- BSONObj getInitialSyncProgress() {
- return _isbr->getInitialSyncProgress();
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorPassesThroughErrorFromDataReplicatorExternalStateGetCurrentConfig) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ getExternalState()->replSetConfigResult = Status(ErrorCodes::OperationFailed, "");
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
}
- // Generate at least one getMore response.
- std::size_t numGetMoreOplogEntries = 0;
- std::size_t numGetMoreOplogEntriesMax = 1;
- BSONObj lastGetMoreOplogEntry;
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
-private:
- void tearDown() override;
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherScheduleError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
- Responses _responses;
- std::unique_ptr<InitialSyncBackgroundRunner> _isbr{nullptr};
-};
+ // Make the tailable oplog query fail. Allow all other requests to be scheduled.
+ executor::RemoteCommandRequest request;
+ _shouldFailRequest = [&request](const executor::RemoteCommandRequest& requestToSend) {
+ if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
+ requestToSend.cmdObj.getBoolField("tailable")) {
+ request = requestToSend;
+ return true;
+ }
+ return false;
+ };
+
+ HostAndPort syncSource("localhost", 12345);
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
-void InitialSyncTest::tearDown() {
- DataReplicatorTest::tearDownExecutorThread();
- _isbr.reset();
- DataReplicatorTest::tearDown();
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+
+ ASSERT_EQUALS(syncSource, request.target);
+ ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname);
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
}
-TEST_F(InitialSyncTest, ShutdownImmediatelyAfterStartup) {
- startSync(1);
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherCallbackError) {
+ auto dr = &getDR();
auto txn = makeOpCtx();
- ASSERT_OK(getDR().shutdown());
- getExecutor().shutdown();
- verifySync(getNet(), ErrorCodes::ShutdownInProgress);
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, _options.localOplogNS, {makeOplogEntry(1)}));
+ net->runReadyNetworkOperations();
+
+ // Oplog tailing query.
+ auto request = assertRemoteCommandNameEquals(
+ "find", net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "dead cursor")));
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->runReadyNetworkOperations();
+
+
+ // OplogFetcher will shut down DatabasesCloner on error after setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _databasesClonerCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(InitialSyncTest, Complete) {
- /**
- * Initial Sync will issue these query/commands
- * - replSetGetRBID
- * - startTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"]
- * - listDatabases (foreach db do below)
- * -- cloneDatabase (see DatabaseCloner tests).
- * - endTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"]
- * - ops = oplog.rs->find({ts:{$gte: startTS}}) (foreach op)
- * -- if local doc is missing, getCollection(op.ns).findOne(_id:op.o2._id)
- * - if any retries were done in the previous loop, endTS query again for minvalid
- * - replSetGetRBID
- *
- */
+TEST_F(DataReplicatorTest,
+ DataReplicatorSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
- auto lastOpAfterClone = BSON(
- "ts" << Timestamp(Seconds(8), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion << "ns"
- << ""
- << "op"
- << "i"
- << "o"
- << BSON("_id" << 5 << "a" << 2));
-
- const Responses responses = {
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
- // listCollections for "a"
- {"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
- // listIndexes:a
- {
- "listIndexes",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:"
- << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
- // Clone Done
- // get latest oplog ts
- {"find", BaseClonerTest::createCursorResponse(0, BSON_ARRAY(lastOpAfterClone))},
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // Applier starts ...
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ auto request =
+ assertRemoteCommandNameEquals("find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL, _options.localOplogNS, {makeOplogEntry(1)})));
+ ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
+ net->runReadyNetworkOperations();
+
+ // Oplog tailing query.
+ // Simulate cursor closing on sync source.
+ request =
+ assertRemoteCommandNameEquals("find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL, _options.localOplogNS, {makeOplogEntry(1)})));
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->runReadyNetworkOperations();
+
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // Last rollback checker replSetGetRBID command.
+ assertRemoteCommandNameEquals(
+ "replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)));
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(OplogEntry(makeOplogEntry(1)).getOpTime(),
+ unittest::assertGet(_lastApplied).opTime);
+}
+
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // Oplog tailing query.
+ // Simulate cursor closing on sync source.
+ auto request = assertRemoteCommandNameEquals(
+ "find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL,
+ _options.localOplogNS,
+ {makeOplogEntry(1), makeOplogEntry(2, "c"), makeOplogEntry(3, "c")})));
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->runReadyNetworkOperations();
+
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(3)});
+
+ // Last rollback checker replSetGetRBID command.
+ assertRemoteCommandNameEquals(
+ "replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)));
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(OplogEntry(makeOplogEntry(3)).getOpTime(),
+ unittest::assertGet(_lastApplied).opTime);
+}
+
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorReturnsRemoteResultsUnavailableOnEarlyOplogFetcherCompletionIfThereAreNotEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // Oplog tailing query.
+ // Simulate cursor closing on sync source.
+ auto request = assertRemoteCommandNameEquals(
+ "find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL,
+ _options.localOplogNS,
+ {makeOplogEntry(1), makeOplogEntry(2, "c"), makeOplogEntry(3, "c")})));
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->runReadyNetworkOperations();
+
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ // Return an oplog entry with an optime that is more recent than what the completed
+ // OplogFetcher has read from the sync source.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(4)});
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::RemoteResultsUnavailable, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorPassesThroughDatabasesClonerScheduleErrorAndCancelsOplogFetcher) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ // Make the listDatabases command fail. Allow all other requests to be scheduled.
+ executor::RemoteCommandRequest request;
+ _shouldFailRequest = [&request](const executor::RemoteCommandRequest& requestToSend) {
+ if ("listDatabases" == requestToSend.cmdObj.firstElement().fieldNameStringData()) {
+ request = requestToSend;
+ return true;
+ }
+ return false;
};
- // Initial sync flag should not be set before starting.
+ HostAndPort syncSource("localhost", 12345);
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // DataReplicator shuts down OplogFetcher when it fails to schedule DatabasesCloner
+ // so we should not expect any network requests in the queue.
+ ASSERT_FALSE(net->hasReadyRequests());
+
+ // OplogFetcher is shutting down but we still need to call runReadyNetworkOperations()
+ // to deliver the cancellation status to the 'DataReplicator::_oplogFetcherCallback'
+ // callback.
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+
+ ASSERT_EQUALS(syncSource, request.target);
+ ASSERT_EQUALS("admin", request.dbname);
+ assertRemoteCommandNameEquals("listDatabases", request);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorPassesThroughDatabasesClonerCallbackErrorAndCancelsOplogFetcher) {
+ auto dr = &getDR();
auto txn = makeOpCtx();
- ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get()));
- startSync(1);
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
- // Play first response to ensure data replicator has entered initial sync state.
- setResponses({responses.begin(), responses.begin() + 1});
- numGetMoreOplogEntriesMax = responses.size();
- playResponses();
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
- // Initial sync flag should be set.
- ASSERT_TRUE(getStorage().getInitialSyncFlag(txn.get()));
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
- // Play rest of the responses after checking initial sync flag.
- setResponses({responses.begin() + 1, responses.end()});
- playResponses();
- log() << "done playing last responses";
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
- log() << "waiting for initial sync to verify it completed OK";
- verifySync(getNet());
+ // Oplog tailing query.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // DatabasesCloner's first remote command - listDatabases
+ assertRemoteCommandNameEquals(
+ "listDatabases",
+ net->scheduleErrorResponse(Status(ErrorCodes::FailedToParse, "listDatabases failed")));
+ net->runReadyNetworkOperations();
+
+ // DatabasesCloner will shut down OplogFetcher on error after setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::FailedToParse, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorIgnoresLocalDatabasesWhenCloningDatabases) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
- log() << "doing asserts";
+ auto net = getNet();
{
- LockGuard lock(_storageInterfaceWorkDoneMutex);
- ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
- ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
- ASSERT_EQ(0, _storageInterfaceWorkDone.oplogEntriesInserted);
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // Oplog tailing query.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // DatabasesCloner's first remote command - listDatabases
+ assertRemoteCommandNameEquals(
+ "listDatabases",
+ net->scheduleSuccessfulResponse(makeListDatabasesResponse({"a", "local", "b"})));
+ net->runReadyNetworkOperations();
+
+ // DatabasesCloner should only send listCollections requests for databases 'a' and 'b'.
+ request = assertRemoteCommandNameEquals(
+ "listCollections",
+ net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {})));
+ ASSERT_EQUALS("a", request.dbname);
+
+ request = assertRemoteCommandNameEquals(
+ "listCollections",
+ net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
+ ASSERT_EQUALS("b", request.dbname);
+
+ // After processing all the database names and returning empty lists of collections for each
+ // database, data cloning should run to completion and we should expect to see a last oplog
+ // entry fetcher request.
+ request = assertRemoteCommandNameEquals(
+ "find",
+ net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
+ ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
}
- log() << "checking initial sync flag isn't set.";
- // Initial sync flag should not be set after completion.
- ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get()));
+ getExecutor().shutdown();
- // getMore responses are generated by playResponses().
- ASSERT_EQUALS(OplogEntry(lastOpAfterClone).getOpTime(), _myLastOpTime);
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(InitialSyncTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
- const Responses responses =
- {
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
- // listCollections for "a"
- {"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
- // listIndexes:a
- {"listIndexes",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:"
- << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
- // Clone Done
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'b.c', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, c:1}}]}}")},
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- };
+TEST_F(DataReplicatorTest,
+ DataReplicatorIgnoresDatabaseInfoDocumentWithoutNameFieldWhenCloningDatabases) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
- // Initial sync flag should not be set before starting.
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // Oplog tailing query.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // DatabasesCloner's first remote command - listDatabases
+ assertRemoteCommandNameEquals(
+ "listDatabases",
+ net->scheduleSuccessfulResponse(BSON("databases" << BSON_ARRAY(BSON("name"
+ << "a")
+ << BSON("bad"
+ << "dbinfo")
+ << BSON("name"
+ << "b"))
+ << "ok"
+ << 1)));
+ net->runReadyNetworkOperations();
+
+ // DatabasesCloner should only send listCollections requests for databases 'a' and 'b'.
+ request = assertRemoteCommandNameEquals(
+ "listCollections",
+ net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {})));
+ ASSERT_EQUALS("a", request.dbname);
+
+ request = assertRemoteCommandNameEquals(
+ "listCollections",
+ net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
+ ASSERT_EQUALS("b", request.dbname);
+
+ // After processing all the database names and returning empty lists of collections for each
+ // database, data cloning should run to completion and we should expect to see a last oplog
+ // entry fetcher request.
+ request = assertRemoteCommandNameEquals(
+ "find",
+ net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
+ ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
+ }
+
+ getExecutor().shutdown();
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorCancelsBothOplogFetcherAndDatabasesClonerOnShutdown) {
+ auto dr = &getDR();
auto txn = makeOpCtx();
- ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get()));
- startSync(1);
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
- // Play first response to ensure data replicator has entered initial sync state.
- setResponses({responses.begin(), responses.begin() + 1});
- playResponses();
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
- // Initial sync flag should be set.
- ASSERT_TRUE(getStorage().getInitialSyncFlag(txn.get()));
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
- // Play rest of the responses after checking initial sync flag.
- setResponses({responses.begin() + 1, responses.end()});
- playResponses();
- log() << "done playing last responses";
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ }
- log() << "waiting for initial sync to verify it completed OK";
- verifySync(getNet());
+ ASSERT_OK(dr->shutdown());
+ executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorPassesThroughSecondLastOplogEntryFetcherScheduleErrorAndCancelsOplogFetcher) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ // Make the second last oplog entry fetcher command fail. Allow all other requests to be
+ // scheduled.
+ executor::RemoteCommandRequest request;
+ bool first = true;
+ _shouldFailRequest = [&first, &request](const executor::RemoteCommandRequest& requestToSend) {
+ if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
+ requestToSend.cmdObj.hasField("sort") &&
+ 1 == requestToSend.cmdObj.getIntField("limit")) {
+ if (first) {
+ first = false;
+ return false;
+ }
+ request = requestToSend;
+ return true;
+ }
+ return false;
+ };
- log() << "doing asserts";
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
{
- LockGuard lock(_storageInterfaceWorkDoneMutex);
- ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
- ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
- ASSERT_EQ(1, _storageInterfaceWorkDone.oplogEntriesInserted);
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // DatabasesCloner will shut down the OplogFetcher on failing to schedule the last entry
+ // oplog fetcher after setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
}
- log() << "checking initial sync flag isn't set.";
- // Initial sync flag should not be set after completion.
- ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get()));
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorPassesThroughSecondLastOplogEntryFetcherCallbackErrorAndCancelsOplogFetcher) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ request = assertRemoteCommandNameEquals(
+ "find",
+ net->scheduleErrorResponse(
+ Status(ErrorCodes::OperationFailed, "second last oplog entry fetcher failed")));
+ ASSERT_TRUE(request.cmdObj.hasField("sort"));
+ ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
+ net->runReadyNetworkOperations();
- ASSERT_EQUALS(OpTime(Timestamp(1, 1), OpTime::kUninitializedTerm), _myLastOpTime);
+ // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
+ // setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(InitialSyncTest, Failpoint) {
- auto failPoint = getGlobalFailPointRegistry()->getFailPoint("failInitialSyncWithBadHost");
- failPoint->setMode(FailPoint::alwaysOn);
- ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
+TEST_F(DataReplicatorTest,
+ DataReplicatorCancelsBothSecondLastOplogEntryFetcherAndOplogFetcherOnShutdown) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
- Timestamp time1(100, 1);
- OpTime opTime1(time1, OpTime::kInitialTerm);
- _myLastOpTime = opTime1;
-
- startSync(1);
-
- verifySync(getNet(), ErrorCodes::InvalidSyncSource);
-}
-
-TEST_F(InitialSyncTest, FailsOnClone) {
- const Responses responses = {
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases",
- fromjson(
- str::stream() << "{ok:0, errmsg:'fail on clone -- listDBs injected failure', code: "
- << int(ErrorCodes::FailedToParse)
- << "}")},
- // rollback checker.
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ request = assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ noi = net->getNextReadyRequest();
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.hasField("sort"));
+ ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
+ net->blackHole(noi);
+ }
+
+ dr->shutdown();
+ executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorCancelsSecondLastOplogEntryFetcherOnOplogFetcherCallbackError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // Save request for OplogFetcher's oplog tailing query. This request will be canceled.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ auto oplogFetcherNetworkOperationIterator = noi;
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ // Blackhole this request which will be canceled when oplog fetcher fails.
+ noi = net->getNextReadyRequest();
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.hasField("sort"));
+ ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
+ net->blackHole(noi);
+
+ // Make oplog fetcher fail.
+ net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator,
+ Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
+ net->runReadyNetworkOperations();
+
+ // _oplogFetcherCallback() will shut down the '_lastOplogEntryFetcher' after setting the
+ // completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _lastOplogEntryFetcherCallbackAfterCloningData().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorReturnsTypeMismatchErrorWhenSecondLastOplogEntryFetcherReturnsMalformedDocument) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto oplogEntry = makeOplogEntry(1);
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({BSON("h"
+ << "not a hash")});
+
+ // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
+ // setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::TypeMismatch, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsOplogOutOfOrderIfStopTimestampPrecedesBeginTimestamp) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
+ // setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
+}
+
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorPassesThroughInsertOplogSeedDocumentErrorAfterDataCloningFinishesWithNoOperationsToApply) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ NamespaceString insertDocumentNss;
+ BSONObj insertDocumentDoc;
+ _storageInterface->insertDocumentFn = [&insertDocumentDoc, &insertDocumentNss](
+ OperationContext*, const NamespaceString& nss, const BSONObj& doc) {
+ insertDocumentNss = nss;
+ insertDocumentDoc = doc;
+ return Status(ErrorCodes::OperationFailed, "failed to insert oplog entry");
};
- startSync(1);
- setResponses(responses);
- playResponses();
- verifySync(getNet(), ErrorCodes::FailedToParse);
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto oplogEntry = makeOplogEntry(1);
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
+ // setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+ ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
+ ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc);
}
-TEST_F(InitialSyncTest, FailOnRollback) {
- const Responses responses =
- {
- // get rollback id
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
- // listCollections for "a"
- {"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
- // listIndexes:a
- {"listIndexes",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:"
- << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
- // Clone Done
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(2,2), h:NumberLong(1), ns:'b.c', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, c:1}}]}}")},
- // Applier starts ...
- // check for rollback
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:2}")},
- };
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorReturnsCallbackCanceledAndDoesNotScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ NamespaceString insertDocumentNss;
+ BSONObj insertDocumentDoc;
+ _storageInterface->insertDocumentFn = [dr, &insertDocumentDoc, &insertDocumentNss](
+ OperationContext*, const NamespaceString& nss, const BSONObj& doc) {
+ insertDocumentNss = nss;
+ insertDocumentDoc = doc;
+ dr->shutdown();
+ return Status::OK();
+ };
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto oplogEntry = makeOplogEntry(1);
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
+ // setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
- startSync(1);
- numGetMoreOplogEntriesMax = responses.size();
- setResponses(responses);
- playResponses();
- verifySync(getNet(), ErrorCodes::UnrecoverableRollbackError);
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+ ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
+ ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc);
}
-TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError) {
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorPassesThroughRollbackCheckerScheduleErrorAfterCloningFinishesWithNoOperationsToApply) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
// Make the second replSetGetRBID command fail. Allow all other requests to be scheduled.
executor::RemoteCommandRequest request;
bool first = true;
@@ -981,375 +2075,1229 @@ TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError)
return false;
};
- const Responses responses =
- {
- // get rollback id
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
- // listCollections for "a"
- {"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
- // listIndexes:a
- {"listIndexes",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:"
- << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
- // Clone Done
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(2,2), h:NumberLong(1), ns:'b.c', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, c:1}}]}}")},
- // Response to replSetGetRBID request is left out so that we can cancel the request by
- // rejecting the executor::TaskExecutor::scheduleRemoteCommand() request.
- };
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
- startSync(1);
- numGetMoreOplogEntriesMax = responses.size();
- setResponses(responses);
- playResponses();
- getExecutor().shutdown();
- verifySync(getNet(), ErrorCodes::OperationFailed);
-}
-
-TEST_F(InitialSyncTest, DataReplicatorPassesThroughOplogFetcherFailure) {
- const Responses responses = {
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- };
+ auto oplogEntry = makeOplogEntry(1);
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
- startSync(1);
+ // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
+ // setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
- setResponses(responses);
- playResponses();
- log() << "done playing responses - oplog fetcher is active";
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorPassesThroughRollbackCheckerCallbackErrorAfterCloningFinishesWithNoOperationsToApply) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto oplogEntry = makeOplogEntry(1);
+ auto net = getNet();
{
- auto net = getNet();
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
- ASSERT_TRUE(net->hasReadyRequests());
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- // Blackhole requests until we see a getMore.
- while (!isOplogGetMore(noi)) {
- log() << "Blackholing non-getMore request: " << noi->getRequest();
- net->blackHole(noi);
- ASSERT_TRUE(net->hasReadyRequests());
- noi = net->getNextReadyRequest();
- }
- log() << "Sending error response to getMore";
- net->scheduleErrorResponse(noi, {ErrorCodes::OperationFailed, "dead cursor"});
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // Last rollback checker replSetGetRBID command.
+ assertRemoteCommandNameEquals(
+ "replSetGetRBID",
+ net->scheduleErrorResponse(
+ Status(ErrorCodes::OperationFailed, "replSetGetRBID command failed")));
+ net->runReadyNetworkOperations();
+
+ // _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting
+ // the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
- verifySync(getNet(), ErrorCodes::OperationFailed);
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(InitialSyncTest, OplogOutOfOrderOnOplogFetchFinish) {
- const Responses responses =
- {
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
- // listCollections for "a"
- {"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
- // listIndexes:a
- {"listIndexes",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:"
- << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a - first batch
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
- // getMore:a - second batch
- {"getMore",
- fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:["
- "{_id:2, a:2} "
- "]}}")},
- // getMore:a - third batch
- {"getMore",
- fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:["
- "{_id:3, a:3} "
- "]}}")},
- // getMore:a - last batch
- {"getMore",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', nextBatch:["
- "{_id:4, a:4} "
- "]}}")},
- // Clone Done
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(7,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:5, a:2}}]}}")},
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // Applier starts ...
- };
+TEST_F(DataReplicatorTest, DataReplicatorCancelsLastRollbackCheckerOnShutdown) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto oplogEntry = makeOplogEntry(1);
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
- startSync(1);
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // Last rollback checker replSetGetRBID command.
+ noi = net->getNextReadyRequest();
+ assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest());
+ net->blackHole(noi);
+
+ // _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting
+ // the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
- numGetMoreOplogEntriesMax = responses.size();
- setResponses({responses.begin(), responses.end() - 4});
- playResponses();
- log() << "done playing first responses";
+ ASSERT_OK(dr->shutdown());
+ executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
- // This variable is used for the reponse timestamps. Setting it to 0 will make the oplog
- // entries come out of order.
- numGetMoreOplogEntries = 0;
- setResponses({responses.end() - 4, responses.end()});
- playResponses();
- log() << "done playing second responses";
- verifySync(getNet(), ErrorCodes::OplogOutOfOrder);
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(InitialSyncTest, InitialSyncStateIsResetAfterFailure) {
- const Responses responses =
- {
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
- // listCollections for "a"
- {"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
- // listIndexes:a
- {"listIndexes",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:"
- << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
- // Clone Done
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(7,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:5, a:2}}]}}")},
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:2}")},
- // Applier starts ...
+TEST_F(DataReplicatorTest, DataReplicatorCancelsLastRollbackCheckerOnOplogFetcherCallbackError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto oplogEntry = makeOplogEntry(1);
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // Save request for OplogFetcher's oplog tailing query. This request will be canceled.
+ auto noi = net->getNextReadyRequest();
+ auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ auto oplogFetcherNetworkOperationIterator = noi;
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // Last rollback checker replSetGetRBID command.
+ noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("replSetGetRBID", request);
+ net->blackHole(noi);
+
+ // Make oplog fetcher fail.
+ net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator,
+ Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
+ net->runReadyNetworkOperations();
+
+ // _oplogFetcherCallback() will shut down the last rollback checker after setting the
+ // completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _rollbackCheckerCheckForRollbackCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsUnrecoverableRollbackErrorIfSyncSourceRolledBackAfterCloningData) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto oplogEntry = makeOplogEntry(1);
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // Last rollback checker replSetGetRBID command.
+ request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId + 1));
+ net->runReadyNetworkOperations();
+ assertRemoteCommandNameEquals("replSetGetRBID", request);
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get()));
+
+ auto oplogEntry = makeOplogEntry(1);
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
+ // database names, we'll simulate copying a single database with a single collection on the
+ // sync source.
+ NamespaceString nss("a.a");
+ request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
+ assertRemoteCommandNameEquals("listDatabases", request);
+ net->runReadyNetworkOperations();
+
+ // listCollections for "a"
+ request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
+ assertRemoteCommandNameEquals("listCollections", request);
+
+ // count:a
+ request = assertRemoteCommandNameEquals(
+ "count", net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1)));
+ ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ // listIndexes:a
+ request = assertRemoteCommandNameEquals(
+ "listIndexes",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL,
+ NamespaceString(nss.getCommandNS()),
+ {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
+ << "_id_"
+ << "ns"
+ << nss.ns())})));
+ ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ // find:a
+ request = assertRemoteCommandNameEquals("find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL, nss, {BSON("_id" << 1 << "a" << 1)})));
+ ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+
+ // Last rollback checker replSetGetRBID command.
+ request = assertRemoteCommandNameEquals(
+ "replSetGetRBID",
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)));
+ net->runReadyNetworkOperations();
+
+ // Deliver cancellation to OplogFetcher.
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(OplogEntry(oplogEntry).getOpTime(), unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(oplogEntry["h"].Long(), unittest::assertGet(_lastApplied).value);
+ ASSERT_FALSE(_storageInterface->getInitialSyncFlag(txn.get()));
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetNextApplierBatchScheduleError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get()));
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Before processing scheduled last oplog entry fetcher response, set flag in
+ // TaskExecutorMock so that DataReplicator will fail to schedule
+ // _getNextApplierBatchCallback().
+ _executorProxy->shouldFailScheduleWork = true;
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
+ // setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughSecondGetNextApplierBatchScheduleError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get()));
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Before processing scheduled last oplog entry fetcher response, set flag in
+ // TaskExecutorMock so that DataReplicator will fail to schedule second
+ // _getNextApplierBatchCallback() at (now + options.getApplierBatchCallbackRetryWait).
+ _executorProxy->shouldFailScheduleWorkAt = true;
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
+ // setting the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorCancelsGetNextApplierBatchOnShutdown) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get()));
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
+ // on to the DatabasesCloner's request.
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
+ net->blackHole(noi);
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // Since we black holed OplogFetcher's find request, _getNextApplierBatch_inlock() will
+ // not return any operations for us to apply, leading to _getNextApplierBatchCallback()
+ // rescheduling itself at new->now() + _options.getApplierBatchCallbackRetryWait.
+ }
+
+ ASSERT_OK(dr->shutdown());
+ executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetNextApplierBatchInLockError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get()));
+
+ // _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
+ // version (not OplogEntry::kOplogVersion).
+ auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntryWithInconsistentVersion =
+ makeOplogEntry(2, "i", OplogEntry::kOplogVersion + 100);
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
+ // oplog buffer and processed by _getNextApplierBatch_inlock().
+ auto request = assertRemoteCommandNameEquals(
+ "find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion})));
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->runReadyNetworkOperations();
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // OplogFetcher's getMore request. Black hole because we already got our bad oplog entry
+ // into the oplog buffer.
+ auto noi = net->getNextReadyRequest();
+ assertRemoteCommandNameEquals("getMore", noi->getRequest());
+ net->blackHole(noi);
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
+ // completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::BadValue, _lastApplied);
+}
+
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorReturnsEmptyBatchFromGetNextApplierBatchInLockIfRsSyncApplyStopFailPointIsEnabled) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get()));
+
+ // _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
+ // version (not OplogEntry::kOplogVersion).
+ auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntryWithInconsistentVersion =
+ makeOplogEntry(2, "i", OplogEntry::kOplogVersion + 100);
+
+ // Enable 'rsSyncApplyStop' so that _getNextApplierBatch_inlock() returns an empty batch of
+ // operations instead of a batch containing an oplog entry with a bad version.
+ auto failPoint = getGlobalFailPointRegistry()->getFailPoint("rsSyncApplyStop");
+ failPoint->setMode(FailPoint::alwaysOn);
+ ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
+ // oplog buffer and processed by _getNextApplierBatch_inlock().
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion}));
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->runReadyNetworkOperations();
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // OplogFetcher's getMore request. Black hole because we already got our bad oplog entry
+ // into the oplog buffer.
+ auto noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("getMore", request);
+ net->blackHole(noi);
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // Since the 'rsSyncApplyStop' fail point is enabled, DataReplicator will get an empty
+ // batch of operations from _getNextApplierBatch_inlock() even though the oplog buffer
+ // is not empty.
+ }
+
+ // If the fail point is not working, the initial sync status will be set to BadValue (due to the
+ // bad oplog entry in the oplog buffer) and shutdown() will not be able to overwrite this status
+ // with CallbackCanceled.
+ // Otherwise, shutdown() will cancel both the OplogFetcher and the scheduled
+ // _getNextApplierBatchCallback() task. The final initial sync status will be CallbackCanceled.
+ ASSERT_OK(dr->shutdown());
+ executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsNoSuchKeyIfApplierBatchContainsAnOplogEntryWithoutHash) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get()));
+
+ // This oplog entry (without a required "h" field) will be read by OplogFetcher and inserted
+ // into OplogBuffer to be retrieved by _getNextApplierBatch_inlock().
+ auto oplogEntryWithoutHash = BSON("ts" << Timestamp(2, 2) << "v" << OplogEntry::kOplogVersion);
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // OplogFetcher's oplog tailing query. Save for later.
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 1LL, _options.localOplogNS, {makeOplogEntry(1), oplogEntryWithoutHash}));
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->runReadyNetworkOperations();
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Ignore OplogFetcher's getMore request.
+ auto noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("getMore", request);
+ net->blackHole(noi);
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
+ // completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierScheduleError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ ASSERT_TRUE(_storageInterface->getInitialSyncFlag(txn.get()));
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // OplogFetcher's oplog tailing query. Save for later.
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ auto oplogFetcherNoi = noi;
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // _getNextApplierBatchCallback() should have rescheduled itself.
+ // We'll insert some operations in the oplog buffer so that we'll attempt to schedule
+ // MultiApplier next time _getNextApplierBatchCallback() runs.
+ net->scheduleSuccessfulResponse(
+ oplogFetcherNoi,
+ executor::RemoteCommandResponse(
+ makeCursorResponse(
+ 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)}),
+ BSONObj(),
+ Milliseconds(0)));
+ net->runReadyNetworkOperations();
+
+ // Ignore OplogFetcher's getMore request.
+ noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("getMore", request);
+
+ // Make MultiApplier::startup() fail.
+ _executorProxy->shouldFailScheduleWork = true;
+
+ // Advance clock until _getNextApplierBatchCallback() runs.
+ auto when = net->now() + _options.getApplierBatchCallbackRetryWait;
+ ASSERT_EQUALS(when, net->runUntil(when));
+
+ // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
+ // completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierCallbackError) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ getExternalState()->multiApplyFn =
+ [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn) {
+ return Status(ErrorCodes::OperationFailed, "multiApply failed");
};
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // OplogFetcher's oplog tailing query. Provide enough operations to trigger MultiApplier.
+ auto request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)}));
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->runReadyNetworkOperations();
- startSync(2);
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
- numGetMoreOplogEntriesMax = responses.size();
- setResponses(responses);
- playResponses();
- log() << "done playing first responses";
+ // Ignore OplogFetcher's getMore request.
+ auto noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("getMore", request);
- // Play first response again to ensure data replicator has entered initial sync state.
- setResponses({responses.begin(), responses.begin() + 1});
- playResponses();
- log() << "done playing first response of second round of responses";
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ // _multiApplierCallback() will shut down the OplogFetcher after setting the completion
+ // status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, DataReplicatorCancelsGetNextApplierBatchCallbackOnOplogFetcherError) {
auto dr = &getDR();
- ASSERT_TRUE(dr->getState() == DataReplicatorState::InitialSync) << ", state: "
- << dr->getDiagnosticString();
- ASSERT_EQUALS(dr->getLastFetched(), OpTimeWithHash());
- ASSERT_EQUALS(dr->getLastApplied(), OpTimeWithHash());
-
- setResponses({responses.begin() + 1, responses.end()});
- playResponses();
- log() << "done playing second round of responses";
- verifySync(getNet(), ErrorCodes::UnrecoverableRollbackError);
-}
-
-TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
- const Responses failedResponses = {
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases",
- fromjson("{ok:0, errmsg:'fail on clone -- listDBs injected failure', code:9}")},
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // OplogFetcher's oplog tailing query. Save for later.
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ auto oplogFetcherNoi = noi;
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // Send error to _oplogFetcherCallback().
+ net->scheduleErrorResponse(oplogFetcherNoi,
+ Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
+
+ // _oplogFetcherCallback() will cancel the _getNextApplierBatchCallback() task after setting
+ // the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto lastOp = makeOplogEntry(2);
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // OplogFetcher's oplog tailing query. Response has enough operations to reach
+ // end timestamp.
+ auto request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1), lastOp}));
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->runReadyNetworkOperations();
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Black hole OplogFetcher's getMore request.
+ auto noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("getMore", request);
+ net->blackHole(noi);
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({lastOp});
+
+ // Last rollback ID.
+ request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ assertRemoteCommandNameEquals("replSetGetRBID", request);
+ net->runReadyNetworkOperations();
+
+ // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
+ // the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value);
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ // To make DataReplicator apply multiple batches, we make the third and last operation a command
+ // so that it will go into a separate batch from the second operation. First operation is the
+ // last fetched entry before data cloning and is not applied.
+ auto lastOp = makeOplogEntry(3, "c");
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // OplogFetcher's oplog tailing query. Response has enough operations to reach
+ // end timestamp.
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2), lastOp}));
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->runReadyNetworkOperations();
+
+ // Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
+ // database names, we'll simulate copying a single database with a single collection on the
+ // sync source.
+ NamespaceString nss("a.a");
+ request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
+ assertRemoteCommandNameEquals("listDatabases", request);
+ net->runReadyNetworkOperations();
+
+ // Black hole OplogFetcher's getMore request.
+ auto noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("getMore", request);
+ net->blackHole(noi);
+
+ // listCollections for "a"
+ request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
+ assertRemoteCommandNameEquals("listCollections", request);
+
+ // count:a
+ request = net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1));
+ assertRemoteCommandNameEquals("count", request);
+ ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ // listIndexes:a
+ request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL,
+ NamespaceString(nss.getCommandNS()),
+ {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
+ << "_id_"
+ << "ns"
+ << nss.ns())}));
+ assertRemoteCommandNameEquals("listIndexes", request);
+ ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ // find:a
+ request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, nss, {BSON("_id" << 1 << "a" << 1)}));
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ // Second last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({lastOp});
+
+ // Last rollback ID.
+ request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ assertRemoteCommandNameEquals("replSetGetRBID", request);
+ net->runReadyNetworkOperations();
+
+ // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
+ // the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value);
+}
+
+TEST_F(
+ DataReplicatorTest,
+ DataReplicatorSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ // Override DataReplicatorExternalState::_multiInitialSyncApply() so that it will also fetch a
+ // missing document.
+ // This forces DataReplicator to evaluate its end timestamp for applying operations after each
+ // batch.
+ getExternalState()->multiApplyFn = [](OperationContext*,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) {
+ // 'OperationPtr*' is ignored by our overridden _multiInitialSyncApply().
+ applyOperation(nullptr);
+ return ops.back().getOpTime();
+ };
+ bool fetchCountIncremented = false;
+ getExternalState()->multiInitialSyncApplyFn = [&fetchCountIncremented](
+ MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32* fetchCount) {
+ if (!fetchCountIncremented) {
+ fetchCount->addAndFetch(1);
+ fetchCountIncremented = true;
+ }
+ return Status::OK();
};
- const Responses successfulResponses =
- {
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
- // listCollections for "a"
- {"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
- // count:a
- {"count", BSON("n" << 5 << "ok" << 1)},
- // listIndexes:a
- {"listIndexes",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:"
- << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a - first batch
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
- // getMore:a - second batch
- {"getMore",
- fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:["
- "{_id:2, a:2} "
- "]}}")},
- // getMore:a - third batch
- {"getMore",
- fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:["
- "{_id:3, a:3} "
- "]}}")},
- // getMore:a - fourth batch
- {"getMore",
- fromjson("{ok:1, cursor:{id:NumberLong(2), ns:'a.a', nextBatch:["
- "{_id:3, a:3} "
- "]}}")},
- // getMore:a - last batch
- {"getMore",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', nextBatch:["
- "{_id:4, a:4} "
- "]}}")},
- // Clone Done
- // get latest oplog ts
- // This is a testing-only side effect of using playResponses. We may end up generating
- // getMore responses past this timestamp 7.
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(7,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:5, a:2}}]}}")},
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // Applier starts ...
- };
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ // Use command for third and last operation to ensure we have two batches to apply.
+ auto lastOp = makeOplogEntry(3, "c");
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+
+ // OplogFetcher's oplog tailing query. Response has enough operations to reach
+ // end timestamp.
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2), lastOp}));
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->runReadyNetworkOperations();
+
+ // Quickest path to a successful DatabasesCloner completion is to respond to the
+ // listDatabases with an empty list of database names.
+ assertRemoteCommandNameEquals(
+ "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
+ net->runReadyNetworkOperations();
+
+ // Black hole OplogFetcher's getMore request.
+ auto noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("getMore", request);
+ net->blackHole(noi);
+
+ // Second last oplog entry fetcher.
+ // Send oplog entry with timestamp 2. DataReplicator will update this end timestamp after
+ // applying the first batch.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+
+ // Third last oplog entry fetcher.
+ processSuccessfulLastOplogEntryFetcherResponse({lastOp});
+
+ // Last rollback ID.
+ request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ assertRemoteCommandNameEquals("replSetGetRBID", request);
+ net->runReadyNetworkOperations();
+
+ // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
+ // the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value);
+
+ ASSERT_TRUE(fetchCountIncremented);
+
+ auto progress = dr->getInitialSyncProgress();
+ log() << "Progress after failed initial sync attempt: " << progress;
+ ASSERT_EQUALS(1, progress.getIntField("fetchedMissingDocs")) << progress;
+}
+
+TEST_F(DataReplicatorTest,
+ DataReplicatorReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ // This fail point makes chooseSyncSourceCallback fail with an InvalidSyncSource error.
+ auto failPoint = getGlobalFailPointRegistry()->getFailPoint("failInitialSyncWithBadHost");
+ failPoint->setMode(FailPoint::alwaysOn);
+ ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, OplogOutOfOrderOnOplogFetchFinish) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(dr->startup(txn.get(), maxAttempts));
+
+ auto net = getNet();
+ int baseRollbackId = 1;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
- startSync(2);
+ // OplogFetcher's oplog tailing query.
+ auto request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1)}));
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->runReadyNetworkOperations();
+
+ // Ignore listDatabases request.
+ auto noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("listDatabases", request);
+ net->blackHole(noi);
+
+ // Ensure that OplogFetcher fails with an OplogOutOfOrder error by responding to the getMore
+ // request with oplog entries containing the following timestamps (most recently processed
+ // oplog entry has a timestamp of 1):
+ // (last=1), 5, 4
+ request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 1LL, _options.localOplogNS, {makeOplogEntry(5), makeOplogEntry(4)}, false));
+ assertRemoteCommandNameEquals("getMore", request);
+ net->runReadyNetworkOperations();
+
+ // Deliver cancellation signal to DatabasesCloner.
+ net->runReadyNetworkOperations();
+ }
+
+ dr->join();
+ ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
+}
+
+TEST_F(DataReplicatorTest, GetInitialSyncProgressReturnsCorrectProgress) {
+ auto dr = &getDR();
+ auto txn = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017));
+ ASSERT_OK(dr->startup(txn.get(), 2U));
+
+ auto net = getNet();
+ int baseRollbackId = 1;
// Play first 2 responses to ensure data replicator has started the oplog fetcher.
- setResponses({failedResponses.begin(), failedResponses.begin() + 3});
- numGetMoreOplogEntriesMax = failedResponses.size() + successfulResponses.size();
- playResponses();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ }
+
log() << "Done playing first failed response";
- auto progress = getInitialSyncProgress();
+ auto progress = dr->getInitialSyncProgress();
log() << "Progress after first failed response: " << progress;
ASSERT_EQUALS(progress.nFields(), 8) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress;
@@ -1362,18 +3310,47 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
// Play rest of the failed round of responses.
- setResponses({failedResponses.begin() + 3, failedResponses.end()});
- playResponses();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Ignore oplog tailing query.
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->blackHole(noi);
+
+ request = net->scheduleErrorResponse(
+ Status(ErrorCodes::FailedToParse, "fail on clone -- listDBs injected failure"));
+ assertRemoteCommandNameEquals("listDatabases", request);
+ net->runReadyNetworkOperations();
+
+ // Deliver cancellation to OplogFetcher
+ net->runReadyNetworkOperations();
+ }
+
log() << "Done playing failed responses";
- // Play the first 3 responses of the successful round of responses to ensure that the
+ // Play the first 2 responses of the successful round of responses to ensure that the
// data replicator starts the oplog fetcher.
- setResponses({successfulResponses.begin(), successfulResponses.begin() + 3});
- numGetMoreOplogEntries = 0;
- playResponses();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ auto when = net->now() + _options.initialSyncRetryWait;
+ ASSERT_EQUALS(when, net->runUntil(when));
+
+ // Base rollback ID.
+ auto request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+ assertRemoteCommandNameEquals("replSetGetRBID", request);
+ net->runReadyNetworkOperations();
+
+ // Last oplog entry.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ }
+
log() << "Done playing first successful response";
- progress = getInitialSyncProgress();
+ progress = dr->getInitialSyncProgress();
log() << "Progress after failure: " << progress;
ASSERT_EQUALS(progress.nFields(), 8) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
@@ -1388,21 +3365,86 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_EQUALS(attempts.nFields(), 1) << attempts;
BSONObj attempt0 = attempts["0"].Obj();
ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
- ASSERT_EQUALS(attempt0.getStringField("status"),
- std::string("FailedToParse: fail on clone -- listDBs injected failure"))
+ ASSERT_EQUALS(
+ attempt0.getStringField("status"),
+ std::string(
+ "FailedToParse: error cloning databases: fail on clone -- listDBs injected failure"))
<< attempt0;
ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
<< attempt0;
// Play all but last of the successful round of responses.
- setResponses({successfulResponses.begin() + 3, successfulResponses.end() - 1});
- // Reset getMore counter because the data replicator starts a new oplog tailing query.
- numGetMoreOplogEntries = 0;
- playResponses();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Ignore oplog tailing query.
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(1LL,
+ _options.localOplogNS,
+ {makeOplogEntry(1),
+ makeOplogEntry(2),
+ makeOplogEntry(3),
+ makeOplogEntry(4),
+ makeOplogEntry(5),
+ makeOplogEntry(6),
+ makeOplogEntry(7)
+
+ }));
+ assertRemoteCommandNameEquals("find", request);
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ net->runReadyNetworkOperations();
+
+ // listDatabases
+ NamespaceString nss("a.a");
+ request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
+ assertRemoteCommandNameEquals("listDatabases", request);
+ net->runReadyNetworkOperations();
+
+ auto noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ assertRemoteCommandNameEquals("getMore", request);
+ net->blackHole(noi);
+
+ // listCollections for "a"
+ request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
+ assertRemoteCommandNameEquals("listCollections", request);
+
+ // count:a
+ request = net->scheduleSuccessfulResponse(BSON("n" << 5 << "ok" << 1));
+ assertRemoteCommandNameEquals("count", request);
+ ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ // listIndexes:a
+ request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL,
+ NamespaceString(nss.getCommandNS()),
+ {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
+ << "_id_"
+ << "ns"
+ << nss.ns())}));
+ assertRemoteCommandNameEquals("listIndexes", request);
+ ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ // find:a - 5 batches
+ for (int i = 1; i <= 5; ++i) {
+ request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(i < 5 ? 2LL : 0LL, nss, {BSON("_id" << i << "a" << i)}, i == 1));
+ ASSERT_EQUALS(i == 1 ? "find" : "getMore",
+ request.cmdObj.firstElement().fieldNameStringData());
+ net->runReadyNetworkOperations();
+ }
+
+ // Second last oplog entry fetcher.
+ // Send oplog entry with timestamp 2. DataReplicator will update this end timestamp after
+ // applying the first batch.
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(7)});
+ }
log() << "Done playing all but last successful response";
- progress = getInitialSyncProgress();
+ progress = dr->getInitialSyncProgress();
log() << "Progress after all but last successful response: " << progress;
ASSERT_EQUALS(progress.nFields(), 9) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
@@ -1432,23 +3474,40 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_EQUALS(attempts.nFields(), 1) << progress;
attempt0 = attempts["0"].Obj();
ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
- ASSERT_EQUALS(attempt0.getStringField("status"),
- std::string("FailedToParse: fail on clone -- listDBs injected failure"))
+ ASSERT_EQUALS(
+ attempt0.getStringField("status"),
+ std::string(
+ "FailedToParse: error cloning databases: fail on clone -- listDBs injected failure"))
<< attempt0;
ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
<< attempt0;
// Play last successful response.
- setResponses({successfulResponses.end() - 1, successfulResponses.end()});
- playResponses();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Last rollback ID.
+ assertRemoteCommandNameEquals(
+ "replSetGetRBID",
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)));
+ net->runReadyNetworkOperations();
+
+ // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
+ // the completion status.
+ // We call runReadyNetworkOperations() again to deliver the cancellation status to
+ // _oplogFetcherCallback().
+ net->runReadyNetworkOperations();
+ }
log() << "waiting for initial sync to verify it completed OK";
- verifySync(getNet());
+ dr->join();
+ ASSERT_EQUALS(OplogEntry(makeOplogEntry(7)).getOpTime(),
+ unittest::assertGet(_lastApplied).opTime);
- progress = getInitialSyncProgress();
+ progress = dr->getInitialSyncProgress();
log() << "Progress at end: " << progress;
- ASSERT_EQUALS(progress.nFields(), 10) << progress;
+ ASSERT_EQUALS(progress.nFields(), 11) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
@@ -1465,8 +3524,10 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
attempt0 = attempts["0"].Obj();
ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
- ASSERT_EQUALS(attempt0.getStringField("status"),
- std::string("FailedToParse: fail on clone -- listDBs injected failure"))
+ ASSERT_EQUALS(
+ attempt0.getStringField("status"),
+ std::string(
+ "FailedToParse: error cloning databases: fail on clone -- listDBs injected failure"))
<< attempt0;
ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
@@ -1480,72 +3541,4 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
<< attempt1;
}
-TEST_F(InitialSyncTest, DataReplicatorCreatesNewApplierForNextBatchBeforeDestroyingCurrentApplier) {
- auto getRollbackIdResponse = BSON("ok" << 1 << "rbid" << 1);
- auto noopOp1 = BSON("ts" << Timestamp(Seconds(1), 1U) << "h" << 1LL << "v"
- << OplogEntry::kOplogVersion
- << "ns"
- << ""
- << "op"
- << "n"
- << "o"
- << BSON("msg"
- << "noop"));
- auto createCollectionOp1 =
- BSON("ts" << Timestamp(Seconds(2), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion
- << "ns"
- << "test.$cmd"
- << "op"
- << "c"
- << "o"
- << BSON("create"
- << "coll1"));
- auto createCollectionOp2 =
- BSON("ts" << Timestamp(Seconds(3), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion
- << "ns"
- << "test.$cmd"
- << "op"
- << "c"
- << "o"
- << BSON("create"
- << "coll2"));
- const Responses responses = {
- // pre-initial sync rollback checker request
- {"replSetGetRBID", getRollbackIdResponse},
- // get latest oplog ts - this should match the first op returned by the oplog fetcher
- {"find",
- BSON("ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
- << "local.oplog.rs"
- << "firstBatch"
- << BSON_ARRAY(noopOp1)))},
- // oplog fetcher find - single set of results containing two commands that have to be
- // applied in separate batches per batching logic
- {"find",
- BSON("ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
- << "local.oplog.rs"
- << "firstBatch"
- << BSON_ARRAY(noopOp1 << createCollectionOp1
- << createCollectionOp2)))},
- // Clone Start
- // listDatabases - return empty list of databases since we're not testing the cloner.
- {"listDatabases", BSON("ok" << 1 << "databases" << BSONArray())},
- // get latest oplog ts - this should match the last op returned by the oplog fetcher
- {"find",
- BSON("ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
- << "local.oplog.rs"
- << "firstBatch"
- << BSON_ARRAY(createCollectionOp2)))},
- // post-initial sync rollback checker request
- {"replSetGetRBID", getRollbackIdResponse},
- };
-
- startSync(1);
-
- setResponses(responses);
- playResponses();
- log() << "Done playing responses";
- verifySync(getNet());
- ASSERT_EQUALS(OplogEntry(createCollectionOp2).getOpTime(), _myLastOpTime);
-}
-
} // namespace
diff --git a/src/mongo/db/repl/initial_sync_state.h b/src/mongo/db/repl/initial_sync_state.h
index cb607824171..d66b762e2da 100644
--- a/src/mongo/db/repl/initial_sync_state.h
+++ b/src/mongo/db/repl/initial_sync_state.h
@@ -48,16 +48,12 @@ namespace repl {
* Holder of state for initial sync (DataReplicator).
*/
struct InitialSyncState {
- InitialSyncState(std::unique_ptr<DatabasesCloner> cloner, Event finishEvent)
- : dbsCloner(std::move(cloner)), finishEvent(finishEvent), status(Status::OK()){};
+ InitialSyncState(std::unique_ptr<DatabasesCloner> cloner) : dbsCloner(std::move(cloner)){};
std::unique_ptr<DatabasesCloner>
dbsCloner; // Cloner for all databases included in initial sync.
- BSONObj oplogSeedDoc; // Document to seed the oplog with when initial sync is done.
Timestamp beginTimestamp; // Timestamp from the latest entry in oplog when started.
Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states.
- Event finishEvent; // event fired on completion, either successful or not.
- Status status; // final status, only valid after the finishEvent fires.
Timer timer; // Timer for timing how long each initial sync attempt takes.
size_t fetchedMissingDocs = 0;
size_t appliedOps = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 699bb2636ac..f51c5ece84b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -37,6 +37,7 @@
#include "mongo/base/status.h"
#include "mongo/client/fetcher.h"
+#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/global_timestamp.h"
@@ -560,7 +561,7 @@ void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) {
LockGuard lk(_mutex);
_dr.swap(drCopy);
}
- if (drCopy && drCopy->getState() == DataReplicatorState::InitialSync) {
+ if (drCopy) {
LOG(1)
<< "ReplicationCoordinatorImpl::_stopDataReplication calling DataReplicator::shutdown.";
const auto status = drCopy->shutdown();
@@ -591,42 +592,32 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn,
// Do initial sync.
if (_externalState->shouldUseDataReplicatorInitialSync()) {
- _externalState->runOnInitialSyncThread([this, startCompleted](OperationContext* txn) {
- std::shared_ptr<DataReplicator> drCopy;
- UniqueLock lk(_mutex); // Must take the lock to set _dr, but not call it.
- drCopy = std::make_shared<DataReplicator>(
- createDataReplicatorOptions(this, _externalState.get()),
- stdx::make_unique<DataReplicatorExternalStateImpl>(this, _externalState.get()),
- _storage);
- _dr = drCopy;
- lk.unlock();
-
- const auto status = drCopy->doInitialSync(txn, numInitialSyncAttempts);
- // If it is interrupted by resync, we do not need to cleanup the DataReplicator.
- if (status == ErrorCodes::ShutdownInProgress) {
- return;
- }
-
- drCopy.reset();
- lk.lock();
+ if (!_externalState->getTaskExecutor()) {
+ log() << "not running initial sync during test.";
+ return;
+ }
- if (status == ErrorCodes::CallbackCanceled) {
- log() << "Initial Sync has been cancelled: " << status.getStatus();
- return;
- } else if (!status.isOK()) {
- if (_inShutdown) {
- log() << "Initial Sync failed during shutdown due to " << status.getStatus();
+ auto onCompletion = [this, startCompleted](const StatusWith<OpTimeWithHash>& status) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (status == ErrorCodes::CallbackCanceled) {
+ log() << "Initial Sync has been cancelled: " << status.getStatus();
return;
- } else {
- error() << "Initial sync failed, shutting down now. Restart the server to "
- "attempt a new initial sync.";
- fassertFailedWithStatusNoTrace(40088, status.getStatus());
+ } else if (!status.isOK()) {
+ if (_inShutdown) {
+ log() << "Initial Sync failed during shutdown due to "
+ << status.getStatus();
+ return;
+ } else {
+ error() << "Initial sync failed, shutting down now. Restart the server "
+ "to attempt a new initial sync.";
+ fassertFailedWithStatusNoTrace(40088, status.getStatus());
+ }
}
- }
- const auto lastApplied = status.getValue();
- _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false);
- lk.unlock();
+ const auto lastApplied = status.getValue();
+ _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false);
+ }
// Clear maint. mode.
while (getMaintenanceMode()) {
@@ -637,9 +628,35 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn,
startCompleted();
}
// Repair local db (to compact it).
- uassertStatusOK(_externalState->runRepairOnLocalDB(txn));
- _externalState->startSteadyStateReplication(txn, this);
- });
+ auto txn = cc().makeOperationContext();
+ uassertStatusOK(_externalState->runRepairOnLocalDB(txn.get()));
+ _externalState->startSteadyStateReplication(txn.get(), this);
+ };
+
+ std::shared_ptr<DataReplicator> drCopy;
+ try {
+ {
+ // Must take the lock to set _dr, but not call it.
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ drCopy = std::make_shared<DataReplicator>(
+ createDataReplicatorOptions(this, _externalState.get()),
+ stdx::make_unique<DataReplicatorExternalStateImpl>(this, _externalState.get()),
+ _storage,
+ onCompletion);
+ _dr = drCopy;
+ }
+ // DataReplicator::startup() must be called outside lock because it uses features (eg.
+ // setting the initial sync flag) which depend on the ReplicationCoordinatorImpl.
+ uassertStatusOK(drCopy->startup(txn, numInitialSyncAttempts));
+ } catch (...) {
+ auto status = exceptionToStatus();
+ log() << "Initial Sync failed to start: " << status;
+ if (ErrorCodes::CallbackCanceled == status ||
+ ErrorCodes::isShutdownError(status.code())) {
+ return;
+ }
+ fassertFailedWithStatusNoTrace(40354, status);
+ }
} else {
_externalState->startInitialSync([this, startCompleted](OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -736,6 +753,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
if (!status.isOK()) {
warning() << "DataReplicator shutdown failed: " << status;
}
+ drCopy->join();
drCopy.reset();
}
_externalState->shutdown(txn);
@@ -2174,7 +2192,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* txn,
auto opTime = _getMyLastAppliedOpTime_inlock();
_topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result);
// If we are in the middle of an initial sync, do a resync.
- doResync = result.isOK() && _dr && _dr->getState() == DataReplicatorState::InitialSync;
+ doResync = result.isOK() && _dr && _dr->isActive();
}
if (doResync) {
diff --git a/src/mongo/executor/task_executor_test_fixture.cpp b/src/mongo/executor/task_executor_test_fixture.cpp
index acbb21cbeed..bc99e1538fa 100644
--- a/src/mongo/executor/task_executor_test_fixture.cpp
+++ b/src/mongo/executor/task_executor_test_fixture.cpp
@@ -43,8 +43,8 @@ Status TaskExecutorTest::getDetectableErrorStatus() {
return Status(ErrorCodes::InternalError, "Not mutated");
}
-void TaskExecutorTest::assertRemoteCommandNameEquals(StringData cmdName,
- const RemoteCommandRequest& request) {
+RemoteCommandRequest TaskExecutorTest::assertRemoteCommandNameEquals(
+ StringData cmdName, const RemoteCommandRequest& request) {
auto&& cmdObj = request.cmdObj;
ASSERT_FALSE(cmdObj.isEmpty());
if (cmdName != cmdObj.firstElementFieldName()) {
@@ -53,6 +53,7 @@ void TaskExecutorTest::assertRemoteCommandNameEquals(StringData cmdName,
<< cmdObj.firstElementFieldName() << "\" instead: " << request.toString();
FAIL(msg);
}
+ return request;
}
TaskExecutorTest::~TaskExecutorTest() = default;
diff --git a/src/mongo/executor/task_executor_test_fixture.h b/src/mongo/executor/task_executor_test_fixture.h
index 0ba3a6d6604..d89b6e0062d 100644
--- a/src/mongo/executor/task_executor_test_fixture.h
+++ b/src/mongo/executor/task_executor_test_fixture.h
@@ -53,10 +53,11 @@ public:
static Status getDetectableErrorStatus();
/**
- * Validates command name in remote command request.
+ * Validates command name in remote command request. Returns the remote command request from
+ * the network interface for further validation if the command name matches.
*/
- static void assertRemoteCommandNameEquals(StringData cmdName,
- const RemoteCommandRequest& request);
+ static RemoteCommandRequest assertRemoteCommandNameEquals(StringData cmdName,
+ const RemoteCommandRequest& request);
protected:
virtual ~TaskExecutorTest();