summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-03-14 14:39:56 -0400
committerJudah Schvimer <judah@mongodb.com>2017-03-14 14:39:56 -0400
commit56b5dbbd9901152a0a185c766679bb3c355ebedd (patch)
treec69c5e195b9ec1f004a1137dc3a2ab6587f87e15 /src
parent595c9195393ca7710257e9caae087f4ebe1bb3cd (diff)
downloadmongo-56b5dbbd9901152a0a185c766679bb3c355ebedd.tar.gz
SERVER-27995 rename DataReplicator to InitialSyncer
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/SConscript12
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h18
-rw-r--r--src/mongo/db/repl/initial_sync_state.h2
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp (renamed from src/mongo/db/repl/data_replicator.cpp)257
-rw-r--r--src/mongo/db/repl/initial_syncer.h (renamed from src/mongo/db/repl/data_replicator.h)36
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp (renamed from src/mongo/db/repl/data_replicator_test.cpp)795
-rw-r--r--src/mongo/db/repl/oplog_buffer.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp54
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h9
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp2
12 files changed, 596 insertions, 599 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index e1bb059fe44..d49481539dc 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -432,7 +432,7 @@ env.Library('repl_coordinator_impl',
'$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/fail_point',
'collection_cloner',
- 'data_replicator',
+ 'initial_syncer',
'data_replicator_external_state_initial_sync',
'repl_coordinator_global',
'repl_coordinator_interface',
@@ -1049,9 +1049,9 @@ env.Library(
)
env.Library(
- target='data_replicator',
+ target='initial_syncer',
source=[
- 'data_replicator.cpp',
+ 'initial_syncer.cpp',
],
LIBDEPS=[
'collection_cloner',
@@ -1069,13 +1069,13 @@ env.Library(
)
env.CppUnitTest(
- target='data_replicator_test',
+ target='initial_syncer_test',
source=[
- 'data_replicator_test.cpp',
+ 'initial_syncer_test.cpp',
],
LIBDEPS=[
'base_cloner_test_fixture',
- 'data_replicator',
+ 'initial_syncer',
'data_replicator_external_state_mock',
'replication_executor_test_fixture',
'sync_source_selector_mock',
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index a1b4bf61e87..d81861e6bbf 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -50,7 +50,7 @@ class TaskExecutor;
namespace repl {
-class DataReplicator;
+class InitialSyncer;
/**
* Holds current term and last committed optime necessary to populate find/getMore command requests.
@@ -58,10 +58,10 @@ class DataReplicator;
using OpTimeWithTerm = OpTimeWith<long long>;
/**
- * This class represents the interface the DataReplicator uses to interact with the
- * rest of the system. All functionality of the DataReplicator that would introduce
+ * This class represents the interface the InitialSyncer uses to interact with the
+ * rest of the system. All functionality of the InitialSyncer that would introduce
* dependencies on large sections of the server code and thus break the unit testability of
- * DataReplicator should be moved here.
+ * InitialSyncer should be moved here.
*/
class DataReplicatorExternalState {
MONGO_DISALLOW_COPYING(DataReplicatorExternalState);
@@ -128,7 +128,7 @@ private:
* Applies the operations described in the oplog entries contained in "ops" using the
* "applyOperation" function.
*
- * Used exclusively by the DataReplicator to construct a MultiApplier.
+ * Used exclusively by the InitialSyncer to construct a MultiApplier.
*/
virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx,
MultiApplier::Operations ops,
@@ -137,7 +137,7 @@ private:
/**
* Used by _multiApply() to write operations to database during steady state replication.
*
- * Used exclusively by the DataReplicator to construct a MultiApplier.
+ * Used exclusively by the InitialSyncer to construct a MultiApplier.
*/
virtual Status _multiSyncApply(MultiApplier::OperationPtrs* ops) = 0;
@@ -145,15 +145,15 @@ private:
* Used by _multiApply() to write operations to database during initial sync. `fetchCount` is a
* pointer to a counter that is incremented every time we fetch a missing document.
*
- * Used exclusively by the DataReplicator to construct a MultiApplier.
+ * Used exclusively by the InitialSyncer to construct a MultiApplier.
*/
virtual Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
const HostAndPort& source,
AtomicUInt32* fetchCount) = 0;
- // Provides DataReplicator with access to _multiApply, _multiSyncApply and
+ // Provides InitialSyncer with access to _multiApply, _multiSyncApply and
// _multiInitialSyncApply.
- friend class DataReplicator;
+ friend class InitialSyncer;
};
} // namespace repl
diff --git a/src/mongo/db/repl/initial_sync_state.h b/src/mongo/db/repl/initial_sync_state.h
index d66b762e2da..d3bf974716c 100644
--- a/src/mongo/db/repl/initial_sync_state.h
+++ b/src/mongo/db/repl/initial_sync_state.h
@@ -45,7 +45,7 @@ namespace mongo {
namespace repl {
/**
- * Holder of state for initial sync (DataReplicator).
+ * Holder of state for initial sync (InitialSyncer).
*/
struct InitialSyncState {
InitialSyncState(std::unique_ptr<DatabasesCloner> cloner) : dbsCloner(std::move(cloner)){};
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 81d63055d72..15ad070eeb7 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
-#include "data_replicator.h"
+#include "initial_syncer.h"
#include <algorithm>
#include <utility>
@@ -105,18 +105,18 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncConnectAttempts, int, 10);
MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncOplogFindAttempts, int, 3);
// The number of initial sync attempts that have failed since server startup. Each instance of
-// DataReplicator may run multiple attempts to fulfill an initial sync request that is triggered
-// when DataReplicator::startup() is called.
+// InitialSyncer may run multiple attempts to fulfill an initial sync request that is triggered
+// when InitialSyncer::startup() is called.
Counter64 initialSyncFailedAttempts;
// The number of initial sync requests that have been requested and failed. Each instance of
-// DataReplicator (upon successful startup()) corresponds to a single initial sync request.
-// This value does not include the number of times where a DataReplicator is created successfully
+// InitialSyncer (upon successful startup()) corresponds to a single initial sync request.
+// This value does not include the number of times where a InitialSyncer is created successfully
// but failed in startup().
Counter64 initialSyncFailures;
// The number of initial sync requests that have been requested and completed successfully. Each
-// instance of DataReplicator corresponds to a single initial sync request.
+// instance of InitialSyncer corresponds to a single initial sync request.
Counter64 initialSyncCompletes;
ServerStatusMetricField<Counter64> displaySSInitialSyncFailedAttempts(
@@ -215,9 +215,8 @@ StatusWith<OpTimeWithHash> parseOpTimeWithHash(const QueryResponseStatus& fetchR
} // namespace
-// Data Replicator
-DataReplicator::DataReplicator(
- DataReplicatorOptions opts,
+InitialSyncer::InitialSyncer(
+ InitialSyncerOptions opts,
std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState,
StorageInterface* storage,
const OnCompletionFn& onCompletion)
@@ -236,24 +235,24 @@ DataReplicator::DataReplicator(
uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion);
}
-DataReplicator::~DataReplicator() {
+InitialSyncer::~InitialSyncer() {
DESTRUCTOR_GUARD({
shutdown();
join();
});
}
-bool DataReplicator::isActive() const {
+bool InitialSyncer::isActive() const {
stdx::lock_guard<stdx::mutex> lock(_mutex);
return _isActive_inlock();
}
-bool DataReplicator::_isActive_inlock() const {
+bool InitialSyncer::_isActive_inlock() const {
return State::kRunning == _state || State::kShuttingDown == _state;
}
-Status DataReplicator::startup(OperationContext* opCtx,
- std::uint32_t initialSyncMaxAttempts) noexcept {
+Status InitialSyncer::startup(OperationContext* opCtx,
+ std::uint32_t initialSyncMaxAttempts) noexcept {
invariant(opCtx);
invariant(initialSyncMaxAttempts >= 1U);
@@ -263,11 +262,11 @@ Status DataReplicator::startup(OperationContext* opCtx,
_state = State::kRunning;
break;
case State::kRunning:
- return Status(ErrorCodes::IllegalOperation, "data replicator already started");
+ return Status(ErrorCodes::IllegalOperation, "initial syncer already started");
case State::kShuttingDown:
- return Status(ErrorCodes::ShutdownInProgress, "data replicator shutting down");
+ return Status(ErrorCodes::ShutdownInProgress, "initial syncer shutting down");
case State::kComplete:
- return Status(ErrorCodes::ShutdownInProgress, "data replicator completed");
+ return Status(ErrorCodes::ShutdownInProgress, "initial syncer completed");
}
_setUp_inlock(opCtx, initialSyncMaxAttempts);
@@ -275,7 +274,7 @@ Status DataReplicator::startup(OperationContext* opCtx,
// Start first initial sync attempt.
std::uint32_t initialSyncAttempt = 0;
auto status = _scheduleWorkAndSaveHandle_inlock(
- stdx::bind(&DataReplicator::_startInitialSyncAttemptCallback,
+ stdx::bind(&InitialSyncer::_startInitialSyncAttemptCallback,
this,
stdx::placeholders::_1,
initialSyncAttempt,
@@ -291,7 +290,7 @@ Status DataReplicator::startup(OperationContext* opCtx,
return Status::OK();
}
-Status DataReplicator::shutdown() {
+Status InitialSyncer::shutdown() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
switch (_state) {
case State::kPreStart:
@@ -312,7 +311,7 @@ Status DataReplicator::shutdown() {
return Status::OK();
}
-void DataReplicator::_cancelRemainingWork_inlock() {
+void InitialSyncer::_cancelRemainingWork_inlock() {
_cancelHandle_inlock(_startInitialSyncAttemptHandle);
_cancelHandle_inlock(_chooseSyncSourceHandle);
_cancelHandle_inlock(_getBaseRollbackIdHandle);
@@ -327,29 +326,29 @@ void DataReplicator::_cancelRemainingWork_inlock() {
_shutdownComponent_inlock(_lastOplogEntryFetcher);
}
-void DataReplicator::join() {
+void InitialSyncer::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_stateCondition.wait(lk, [this]() { return !_isActive_inlock(); });
}
-DataReplicator::State DataReplicator::getState_forTest() const {
+InitialSyncer::State InitialSyncer::getState_forTest() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _state;
}
-bool DataReplicator::_isShuttingDown() const {
+bool InitialSyncer::_isShuttingDown() const {
stdx::lock_guard<stdx::mutex> lock(_mutex);
return _isShuttingDown_inlock();
}
-bool DataReplicator::_isShuttingDown_inlock() const {
+bool InitialSyncer::_isShuttingDown_inlock() const {
return State::kShuttingDown == _state;
}
-std::string DataReplicator::getDiagnosticString() const {
+std::string InitialSyncer::getDiagnosticString() const {
LockGuard lk(_mutex);
str::stream out;
- out << "DataReplicator -"
+ out << "InitialSyncer -"
<< " opts: " << _opts.toString() << " oplogFetcher: " << _oplogFetcher->toString()
<< " opsBuffered: " << _oplogBuffer->getSize() << " active: " << _isActive_inlock()
<< " shutting down: " << _isShuttingDown_inlock();
@@ -360,12 +359,12 @@ std::string DataReplicator::getDiagnosticString() const {
return out;
}
-BSONObj DataReplicator::getInitialSyncProgress() const {
+BSONObj InitialSyncer::getInitialSyncProgress() const {
LockGuard lk(_mutex);
return _getInitialSyncProgress_inlock();
}
-BSONObj DataReplicator::_getInitialSyncProgress_inlock() const {
+BSONObj InitialSyncer::_getInitialSyncProgress_inlock() const {
BSONObjBuilder bob;
try {
_stats.append(&bob);
@@ -392,12 +391,12 @@ BSONObj DataReplicator::_getInitialSyncProgress_inlock() const {
return bob.obj();
}
-void DataReplicator::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
+void InitialSyncer::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
LockGuard lk(_mutex);
_scheduleDbWorkFn = work;
}
-void DataReplicator::_setUp_inlock(OperationContext* opCtx, std::uint32_t initialSyncMaxAttempts) {
+void InitialSyncer::_setUp_inlock(OperationContext* opCtx, std::uint32_t initialSyncMaxAttempts) {
// This will call through to the storageInterfaceImpl to ReplicationCoordinatorImpl.
// 'opCtx' is passed through from startup().
_storage->setInitialSyncFlag(opCtx);
@@ -411,8 +410,8 @@ void DataReplicator::_setUp_inlock(OperationContext* opCtx, std::uint32_t initia
_stats.failedInitialSyncAttempts = 0;
}
-void DataReplicator::_tearDown_inlock(OperationContext* opCtx,
- const StatusWith<OpTimeWithHash>& lastApplied) {
+void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
+ const StatusWith<OpTimeWithHash>& lastApplied) {
_stats.initialSyncEnd = _exec->now();
// This might not be necessary if we failed initial sync.
@@ -429,7 +428,7 @@ void DataReplicator::_tearDown_inlock(OperationContext* opCtx,
initialSyncCompletes.increment();
}
-void DataReplicator::_startInitialSyncAttemptCallback(
+void InitialSyncer::_startInitialSyncAttemptCallback(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::uint32_t initialSyncAttempt,
std::uint32_t initialSyncMaxAttempts) {
@@ -471,22 +470,22 @@ void DataReplicator::_startInitialSyncAttemptCallback(
static_cast<std::uint32_t>(numInitialSyncConnectAttempts.load());
// _scheduleWorkAndSaveHandle_inlock() is shutdown-aware.
- status = _scheduleWorkAndSaveHandle_inlock(
- stdx::bind(&DataReplicator::_chooseSyncSourceCallback,
- this,
- stdx::placeholders::_1,
- chooseSyncSourceAttempt,
- chooseSyncSourceMaxAttempts,
- onCompletionGuard),
- &_chooseSyncSourceHandle,
- str::stream() << "_chooseSyncSourceCallback-" << chooseSyncSourceAttempt);
+ status = _scheduleWorkAndSaveHandle_inlock(stdx::bind(&InitialSyncer::_chooseSyncSourceCallback,
+ this,
+ stdx::placeholders::_1,
+ chooseSyncSourceAttempt,
+ chooseSyncSourceMaxAttempts,
+ onCompletionGuard),
+ &_chooseSyncSourceHandle,
+ str::stream() << "_chooseSyncSourceCallback-"
+ << chooseSyncSourceAttempt);
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
}
-void DataReplicator::_chooseSyncSourceCallback(
+void InitialSyncer::_chooseSyncSourceCallback(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::uint32_t chooseSyncSourceAttempt,
std::uint32_t chooseSyncSourceMaxAttempts,
@@ -494,7 +493,7 @@ void DataReplicator::_chooseSyncSourceCallback(
stdx::lock_guard<stdx::mutex> lock(_mutex);
// Cancellation should be treated the same as other errors. In this case, the most likely cause
// of a failed _chooseSyncSourceCallback() task is a cancellation triggered by
- // DataReplicator::shutdown() or the task executor shutting down.
+ // InitialSyncer::shutdown() or the task executor shutting down.
auto status =
_checkForShutdownAndConvertStatus_inlock(callbackArgs, "error while choosing sync source");
if (!status.isOK()) {
@@ -525,7 +524,7 @@ void DataReplicator::_chooseSyncSourceCallback(
<< (chooseSyncSourceAttempt + 1) << " of " << numInitialSyncConnectAttempts.load();
auto status = _scheduleWorkAtAndSaveHandle_inlock(
when,
- stdx::bind(&DataReplicator::_chooseSyncSourceCallback,
+ stdx::bind(&InitialSyncer::_chooseSyncSourceCallback,
this,
stdx::placeholders::_1,
chooseSyncSourceAttempt + 1,
@@ -553,7 +552,7 @@ void DataReplicator::_chooseSyncSourceCallback(
_syncSource = syncSource.getValue();
_rollbackChecker = stdx::make_unique<RollbackChecker>(_exec, _syncSource);
auto scheduleResult =
- _rollbackChecker->reset(stdx::bind(&DataReplicator::_rollbackCheckerResetCallback,
+ _rollbackChecker->reset(stdx::bind(&InitialSyncer::_rollbackCheckerResetCallback,
this,
stdx::placeholders::_1,
onCompletionGuard));
@@ -565,7 +564,7 @@ void DataReplicator::_chooseSyncSourceCallback(
_getBaseRollbackIdHandle = scheduleResult.getValue();
}
-Status DataReplicator::_recreateOplogAndDropReplicatedDatabases() {
+Status InitialSyncer::_recreateOplogAndDropReplicatedDatabases() {
// drop/create oplog; drop user databases.
LOG(1) << "About to drop+create the oplog, if it exists, ns:" << _opts.localOplogNS
<< ", and drop all user databases (so that we can clone them).";
@@ -594,7 +593,7 @@ Status DataReplicator::_recreateOplogAndDropReplicatedDatabases() {
return _storage->createOplog(opCtx.get(), _opts.localOplogNS);
}
-void DataReplicator::_rollbackCheckerResetCallback(
+void InitialSyncer::_rollbackCheckerResetCallback(
const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(),
@@ -605,7 +604,7 @@ void DataReplicator::_rollbackCheckerResetCallback(
}
status = _scheduleLastOplogEntryFetcher_inlock(
- stdx::bind(&DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp,
+ stdx::bind(&InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp,
this,
stdx::placeholders::_1,
onCompletionGuard));
@@ -615,7 +614,7 @@ void DataReplicator::_rollbackCheckerResetCallback(
}
}
-void DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp(
+void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp(
const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
@@ -652,7 +651,7 @@ void DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp(
_dataReplicatorExternalState->getDbWorkThreadPool(),
_syncSource,
listDatabasesFilter,
- stdx::bind(&DataReplicator::_databasesClonerCallback,
+ stdx::bind(&InitialSyncer::_databasesClonerCallback,
this,
stdx::placeholders::_1,
onCompletionGuard)));
@@ -685,12 +684,12 @@ void DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp(
_rollbackChecker->getBaseRBID(),
false /* requireFresherSyncSource */,
_dataReplicatorExternalState.get(),
- stdx::bind(&DataReplicator::_enqueueDocuments,
+ stdx::bind(&InitialSyncer::_enqueueDocuments,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
stdx::placeholders::_3),
- stdx::bind(&DataReplicator::_oplogFetcherCallback,
+ stdx::bind(&InitialSyncer::_oplogFetcherCallback,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
@@ -738,9 +737,9 @@ void DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp(
}
}
-void DataReplicator::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus,
- const OpTimeWithHash& lastFetched,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus,
+ const OpTimeWithHash& lastFetched,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
log() << "Finished fetching oplog during initial sync: " << redact(oplogFetcherFinishStatus)
<< ". Last fetched optime and hash: " << lastFetched;
@@ -773,9 +772,8 @@ void DataReplicator::_oplogFetcherCallback(const Status& oplogFetcherFinishStatu
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
}
-void DataReplicator::_databasesClonerCallback(
- const Status& databaseClonerFinishStatus,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+void InitialSyncer::_databasesClonerCallback(const Status& databaseClonerFinishStatus,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(databaseClonerFinishStatus,
"error cloning databases");
@@ -785,7 +783,7 @@ void DataReplicator::_databasesClonerCallback(
}
status = _scheduleLastOplogEntryFetcher_inlock(
- stdx::bind(&DataReplicator::_lastOplogEntryFetcherCallbackForStopTimestamp,
+ stdx::bind(&InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp,
this,
stdx::placeholders::_1,
onCompletionGuard));
@@ -795,7 +793,7 @@ void DataReplicator::_databasesClonerCallback(
}
}
-void DataReplicator::_lastOplogEntryFetcherCallbackForStopTimestamp(
+void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
{
@@ -837,9 +835,9 @@ void DataReplicator::_lastOplogEntryFetcherCallbackForStopTimestamp(
auto opCtx = makeOpCtx();
// StorageInterface::insertDocument() has to be called outside the lock because we may
- // override its behavior in tests. See DataReplicatorReturnsCallbackCanceledAndDoesNot-
+ // override its behavior in tests. See InitialSyncerReturnsCallbackCanceledAndDoesNot-
// ScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument in
- // data_replicator_test.cpp
+ // initial_syncer_test.cpp
auto status = _storage->insertDocument(opCtx.get(), _opts.localOplogNS, oplogSeedDoc);
if (!status.isOK()) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -853,7 +851,7 @@ void DataReplicator::_lastOplogEntryFetcherCallbackForStopTimestamp(
_scheduleRollbackCheckerCheckForRollback_inlock(lock, onCompletionGuard);
}
-void DataReplicator::_getNextApplierBatchCallback(
+void InitialSyncer::_getNextApplierBatchCallback(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -899,17 +897,16 @@ void DataReplicator::_getNextApplierBatchCallback(
auto lastApplied = opTimeWithHashStatus.getValue();
auto numApplied = ops.size();
- _applier =
- stdx::make_unique<MultiApplier>(_exec,
- ops,
- applyOperationsForEachReplicationWorkerThreadFn,
- applyBatchOfOperationsFn,
- stdx::bind(&DataReplicator::_multiApplierCallback,
- this,
- stdx::placeholders::_1,
- lastApplied,
- numApplied,
- onCompletionGuard));
+ _applier = stdx::make_unique<MultiApplier>(_exec,
+ ops,
+ applyOperationsForEachReplicationWorkerThreadFn,
+ applyBatchOfOperationsFn,
+ stdx::bind(&InitialSyncer::_multiApplierCallback,
+ this,
+ stdx::placeholders::_1,
+ lastApplied,
+ numApplied,
+ onCompletionGuard));
status = _startupComponent_inlock(_applier);
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
@@ -937,24 +934,24 @@ void DataReplicator::_getNextApplierBatchCallback(
// the sync source, we'll check the oplog buffer again in
// '_opts.getApplierBatchCallbackRetryWait' ms.
auto when = _exec->now() + _opts.getApplierBatchCallbackRetryWait;
- status = _scheduleWorkAtAndSaveHandle_inlock(
- when,
- stdx::bind(&DataReplicator::_getNextApplierBatchCallback,
- this,
- stdx::placeholders::_1,
- onCompletionGuard),
- &_getNextApplierBatchHandle,
- "_getNextApplierBatchCallback");
+ status =
+ _scheduleWorkAtAndSaveHandle_inlock(when,
+ stdx::bind(&InitialSyncer::_getNextApplierBatchCallback,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard),
+ &_getNextApplierBatchHandle,
+ "_getNextApplierBatchCallback");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
}
-void DataReplicator::_multiApplierCallback(const Status& multiApplierStatus,
- OpTimeWithHash lastApplied,
- std::uint32_t numApplied,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus,
+ OpTimeWithHash lastApplied,
+ std::uint32_t numApplied,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
auto status =
_checkForShutdownAndConvertStatus_inlock(multiApplierStatus, "error applying batch");
@@ -973,7 +970,7 @@ void DataReplicator::_multiApplierCallback(const Status& multiApplierStatus,
_initialSyncState->fetchedMissingDocs += fetchCount;
_fetchCount.store(0);
status = _scheduleLastOplogEntryFetcher_inlock(
- stdx::bind(&DataReplicator::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments,
+ stdx::bind(&InitialSyncer::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments,
this,
stdx::placeholders::_1,
onCompletionGuard));
@@ -987,7 +984,7 @@ void DataReplicator::_multiApplierCallback(const Status& multiApplierStatus,
_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard);
}
-void DataReplicator::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments(
+void InitialSyncer::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments(
const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -1019,7 +1016,7 @@ void DataReplicator::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments
_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard);
}
-void DataReplicator::_rollbackCheckerCheckForRollbackCallback(
+void InitialSyncer::_rollbackCheckerCheckForRollbackCallback(
const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(),
@@ -1043,7 +1040,7 @@ void DataReplicator::_rollbackCheckerCheckForRollbackCallback(
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, _lastApplied);
}
-void DataReplicator::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& lastApplied) {
+void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& lastApplied) {
// Since _finishInitialSyncAttempt can be called from any component's callback function or
// scheduled task, it is possible that we may not be in a TaskExecutor-managed thread when this
// function is invoked.
@@ -1061,9 +1058,9 @@ void DataReplicator::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>&
auto result = lastApplied;
auto finishCallbackGuard = MakeGuard([this, &result] {
auto scheduleResult =
- _exec->scheduleWork(stdx::bind(&DataReplicator::_finishCallback, this, result));
+ _exec->scheduleWork(stdx::bind(&InitialSyncer::_finishCallback, this, result));
if (!scheduleResult.isOK()) {
- warning() << "Unable to schedule data replicator completion task due to "
+ warning() << "Unable to schedule initial syncer completion task due to "
<< redact(scheduleResult.getStatus())
<< ". Running callback on current thread.";
_finishCallback(result);
@@ -1077,7 +1074,7 @@ void DataReplicator::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>&
auto runTime = _initialSyncState ? _initialSyncState->timer.millis() : 0;
_stats.initialSyncAttemptInfos.emplace_back(
- DataReplicator::InitialSyncAttemptInfo{runTime, result.getStatus(), _syncSource});
+ InitialSyncer::InitialSyncAttemptInfo{runTime, result.getStatus(), _syncSource});
if (result.isOK()) {
// Scope guard will invoke _finishCallback().
@@ -1111,7 +1108,7 @@ void DataReplicator::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>&
auto when = _exec->now() + _opts.initialSyncRetryWait;
auto status = _scheduleWorkAtAndSaveHandle_inlock(
when,
- stdx::bind(&DataReplicator::_startInitialSyncAttemptCallback,
+ stdx::bind(&InitialSyncer::_startInitialSyncAttemptCallback,
this,
stdx::placeholders::_1,
_stats.failedInitialSyncAttempts,
@@ -1131,12 +1128,12 @@ void DataReplicator::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>&
finishCallbackGuard.Dismiss();
}
-void DataReplicator::_finishCallback(StatusWith<OpTimeWithHash> lastApplied) {
+void InitialSyncer::_finishCallback(StatusWith<OpTimeWithHash> lastApplied) {
// After running callback function, clear '_onCompletion' to release any resources that might be
// held by this function object.
// '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case
// there is any logic that's invoked at the function object's destruction that might call into
- // this DataReplicator. 'onCompletion' must be destroyed outside the lock and this should happen
+ // this InitialSyncer. 'onCompletion' must be destroyed outside the lock and this should happen
// before we transition the state to Complete.
decltype(_onCompletion) onCompletion;
{
@@ -1161,13 +1158,13 @@ void DataReplicator::_finishCallback(StatusWith<OpTimeWithHash> lastApplied) {
try {
onCompletion(lastApplied);
} catch (...) {
- warning() << "data replicator finish callback threw exception: "
+ warning() << "initial syncer finish callback threw exception: "
<< redact(exceptionToStatus());
}
// Destroy the remaining reference to the completion callback before we transition the state to
// Complete so that callers can expect any resources bound to '_onCompletion' to be released
- // before DataReplicator::join() returns.
+ // before InitialSyncer::join() returns.
onCompletion = {};
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -1176,7 +1173,7 @@ void DataReplicator::_finishCallback(StatusWith<OpTimeWithHash> lastApplied) {
_stateCondition.notify_all();
}
-Status DataReplicator::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback) {
+Status InitialSyncer::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback) {
BSONObj query = BSON(
"find" << _opts.remoteOplogNS.coll() << "sort" << BSON("$natural" << -1) << "limit" << 1);
@@ -1200,7 +1197,7 @@ Status DataReplicator::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn
return scheduleStatus;
}
-void DataReplicator::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
+void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
const stdx::lock_guard<stdx::mutex>& lock,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
// We should check our current state because shutdown() could have been called before
@@ -1210,7 +1207,7 @@ void DataReplicator::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
lock,
Status(ErrorCodes::CallbackCanceled,
"failed to schedule applier to check for "
- "rollback: data replicator is shutting down"));
+ "rollback: initial syncer is shutting down"));
return;
}
@@ -1243,7 +1240,7 @@ void DataReplicator::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
// Get another batch to apply.
// _scheduleWorkAndSaveHandle_inlock() is shutdown-aware.
auto status =
- _scheduleWorkAndSaveHandle_inlock(stdx::bind(&DataReplicator::_getNextApplierBatchCallback,
+ _scheduleWorkAndSaveHandle_inlock(stdx::bind(&InitialSyncer::_getNextApplierBatchCallback,
this,
stdx::placeholders::_1,
onCompletionGuard),
@@ -1255,7 +1252,7 @@ void DataReplicator::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
}
}
-void DataReplicator::_scheduleRollbackCheckerCheckForRollback_inlock(
+void InitialSyncer::_scheduleRollbackCheckerCheckForRollback_inlock(
const stdx::lock_guard<stdx::mutex>& lock,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
// We should check our current state because shutdown() could have been called before
@@ -1265,13 +1262,13 @@ void DataReplicator::_scheduleRollbackCheckerCheckForRollback_inlock(
lock,
Status(ErrorCodes::CallbackCanceled,
"failed to schedule rollback checker to check "
- "for rollback: data replicator is shutting "
+ "for rollback: initial syncer is shutting "
"down"));
return;
}
auto scheduleResult = _rollbackChecker->checkForRollback(
- stdx::bind(&DataReplicator::_rollbackCheckerCheckForRollbackCallback,
+ stdx::bind(&InitialSyncer::_rollbackCheckerCheckForRollbackCallback,
this,
stdx::placeholders::_1,
onCompletionGuard));
@@ -1286,16 +1283,16 @@ void DataReplicator::_scheduleRollbackCheckerCheckForRollback_inlock(
return;
}
-Status DataReplicator::_checkForShutdownAndConvertStatus_inlock(
+Status InitialSyncer::_checkForShutdownAndConvertStatus_inlock(
const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message) {
return _checkForShutdownAndConvertStatus_inlock(callbackArgs.status, message);
}
-Status DataReplicator::_checkForShutdownAndConvertStatus_inlock(const Status& status,
- const std::string& message) {
+Status InitialSyncer::_checkForShutdownAndConvertStatus_inlock(const Status& status,
+ const std::string& message) {
if (_isShuttingDown_inlock()) {
- return Status(ErrorCodes::CallbackCanceled, message + ": data replicator is shutting down");
+ return Status(ErrorCodes::CallbackCanceled, message + ": initial syncer is shutting down");
}
if (!status.isOK()) {
@@ -1305,7 +1302,7 @@ Status DataReplicator::_checkForShutdownAndConvertStatus_inlock(const Status& st
return Status::OK();
}
-Status DataReplicator::_scheduleWorkAndSaveHandle_inlock(
+Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock(
const executor::TaskExecutor::CallbackFn& work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
@@ -1313,7 +1310,7 @@ Status DataReplicator::_scheduleWorkAndSaveHandle_inlock(
if (_isShuttingDown_inlock()) {
return Status(ErrorCodes::CallbackCanceled,
str::stream() << "failed to schedule work " << name
- << ": data replicator is shutting down");
+ << ": initial syncer is shutting down");
}
auto result = _exec->scheduleWork(work);
if (!result.isOK()) {
@@ -1325,7 +1322,7 @@ Status DataReplicator::_scheduleWorkAndSaveHandle_inlock(
return Status::OK();
}
-Status DataReplicator::_scheduleWorkAtAndSaveHandle_inlock(
+Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock(
Date_t when,
const executor::TaskExecutor::CallbackFn& work,
executor::TaskExecutor::CallbackHandle* handle,
@@ -1335,7 +1332,7 @@ Status DataReplicator::_scheduleWorkAtAndSaveHandle_inlock(
return Status(ErrorCodes::CallbackCanceled,
str::stream() << "failed to schedule work " << name << " at "
<< when.toString()
- << ": data replicator is shutting down");
+ << ": initial syncer is shutting down");
}
auto result = _exec->scheduleWorkAt(when, work);
if (!result.isOK()) {
@@ -1348,7 +1345,7 @@ Status DataReplicator::_scheduleWorkAtAndSaveHandle_inlock(
return Status::OK();
}
-void DataReplicator::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle) {
+void InitialSyncer::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle) {
if (!handle) {
return;
}
@@ -1356,10 +1353,10 @@ void DataReplicator::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle
}
template <typename Component>
-Status DataReplicator::_startupComponent_inlock(Component& component) {
+Status InitialSyncer::_startupComponent_inlock(Component& component) {
if (_isShuttingDown_inlock()) {
return Status(ErrorCodes::CallbackCanceled,
- "data replicator shutdown while trying to call startup() on component");
+ "initial syncer shutdown while trying to call startup() on component");
}
auto status = component->startup();
if (!status.isOK()) {
@@ -1369,17 +1366,17 @@ Status DataReplicator::_startupComponent_inlock(Component& component) {
}
template <typename Component>
-void DataReplicator::_shutdownComponent_inlock(Component& component) {
+void InitialSyncer::_shutdownComponent_inlock(Component& component) {
if (!component) {
return;
}
component->shutdown();
}
-StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
+StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() {
// If the fail-point is active, delay the apply batch by returning an empty batch so that
// _getNextApplierBatchCallback() will reschedule itself at a later time.
- // See DataReplicatorOptions::getApplierBatchCallbackRetryWait.
+ // See InitialSyncerOptions::getApplierBatchCallbackRetryWait.
if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
return Operations();
}
@@ -1459,7 +1456,7 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
return std::move(ops);
}
-StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() {
+StatusWith<HostAndPort> InitialSyncer::_chooseSyncSource_inlock() {
auto syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime);
if (syncSource.empty()) {
return Status{ErrorCodes::InvalidSyncSource,
@@ -1469,9 +1466,9 @@ StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() {
return syncSource;
}
-Status DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info) {
+Status InitialSyncer::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info) {
if (info.toApplyDocumentCount == 0) {
return Status::OK();
}
@@ -1499,17 +1496,17 @@ Status DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begi
return Status::OK();
}
-std::string DataReplicator::Stats::toString() const {
+std::string InitialSyncer::Stats::toString() const {
return toBSON().toString();
}
-BSONObj DataReplicator::Stats::toBSON() const {
+BSONObj InitialSyncer::Stats::toBSON() const {
BSONObjBuilder bob;
append(&bob);
return bob.obj();
}
-void DataReplicator::Stats::append(BSONObjBuilder* builder) const {
+void InitialSyncer::Stats::append(BSONObjBuilder* builder) const {
builder->appendNumber("failedInitialSyncAttempts",
static_cast<long long>(failedInitialSyncAttempts));
builder->appendNumber("maxFailedInitialSyncAttempts",
@@ -1530,17 +1527,17 @@ void DataReplicator::Stats::append(BSONObjBuilder* builder) const {
arrBuilder.doneFast();
}
-std::string DataReplicator::InitialSyncAttemptInfo::toString() const {
+std::string InitialSyncer::InitialSyncAttemptInfo::toString() const {
return toBSON().toString();
}
-BSONObj DataReplicator::InitialSyncAttemptInfo::toBSON() const {
+BSONObj InitialSyncer::InitialSyncAttemptInfo::toBSON() const {
BSONObjBuilder bob;
append(&bob);
return bob.obj();
}
-void DataReplicator::InitialSyncAttemptInfo::append(BSONObjBuilder* builder) const {
+void InitialSyncer::InitialSyncAttemptInfo::append(BSONObjBuilder* builder) const {
builder->appendNumber("durationMillis", durationMillis);
builder->append("status", status.toString());
builder->append("syncSource", syncSource.toString());
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/initial_syncer.h
index e2b96a92f6a..7ce2d5d7024 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -74,7 +74,7 @@ struct MemberState;
class StorageInterface;
-struct DataReplicatorOptions {
+struct InitialSyncerOptions {
/** Function to return optime of last operation applied on this node */
using GetMyLastOptimeFn = stdx::function<OpTime()>;
@@ -93,7 +93,7 @@ struct DataReplicatorOptions {
Seconds blacklistSyncSourcePenaltyForNetworkConnectionError{10};
Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10};
- // DataReplicator waits this long before retrying getApplierBatchCallback() if there are
+ // InitialSyncer waits this long before retrying getApplierBatchCallback() if there are
// currently no operations available to apply or if the 'rsSyncApplyStop' failpoint is active.
// This default value is based on the duration in BackgroundSync::waitForMore() and
// SyncTail::tryPopAndWaitForMore().
@@ -118,14 +118,14 @@ struct DataReplicatorOptions {
std::uint32_t oplogFetcherMaxFetcherRestarts = 0;
std::string toString() const {
- return str::stream() << "DataReplicatorOptions -- "
+ return str::stream() << "InitialSyncerOptions -- "
<< " localOplogNs: " << localOplogNS.toString()
<< " remoteOplogNS: " << remoteOplogNS.toString();
}
};
/**
- * The data replicator provides services to keep collection in sync by replicating
+ * The initial syncer provides services to keep collection in sync by replicating
* changes via an oplog source to the local system storage.
*
* This class will use existing machinery like the Executor to schedule work and
@@ -135,8 +135,8 @@ struct DataReplicatorOptions {
* Entry Points:
* -- startup: Start initial sync.
*/
-class DataReplicator {
- MONGO_DISALLOW_COPYING(DataReplicator);
+class InitialSyncer {
+ MONGO_DISALLOW_COPYING(InitialSyncer);
public:
/**
@@ -145,7 +145,7 @@ public:
typedef stdx::function<void(const StatusWith<OpTimeWithHash>& lastApplied)> OnCompletionFn;
/**
- * Callback completion guard for data replicator.
+ * Callback completion guard for initial syncer.
*/
using OnCompletionGuard = CallbackCompletionGuard<StatusWith<OpTimeWithHash>>;
@@ -164,19 +164,19 @@ public:
std::uint32_t maxFailedInitialSyncAttempts{0};
Date_t initialSyncStart;
Date_t initialSyncEnd;
- std::vector<DataReplicator::InitialSyncAttemptInfo> initialSyncAttemptInfos;
+ std::vector<InitialSyncer::InitialSyncAttemptInfo> initialSyncAttemptInfos;
std::string toString() const;
BSONObj toBSON() const;
void append(BSONObjBuilder* builder) const;
};
- DataReplicator(DataReplicatorOptions opts,
- std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState,
- StorageInterface* storage,
- const OnCompletionFn& onCompletion);
+ InitialSyncer(InitialSyncerOptions opts,
+ std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState,
+ StorageInterface* storage,
+ const OnCompletionFn& onCompletion);
- virtual ~DataReplicator();
+ virtual ~InitialSyncer();
/**
* Returns true if an initial sync is currently running or in the process of shutting down.
@@ -223,7 +223,7 @@ public:
enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
/**
- * Returns current data replicator state.
+ * Returns current initial syncer state.
* For testing only.
*/
State getState_forTest() const;
@@ -242,7 +242,7 @@ private:
void _cancelRemainingWork_inlock();
/**
- * Returns true if the data replicator has received a shutdown request (_state is ShuttingDown).
+ * Returns true if the initial syncer has received a shutdown request (_state is ShuttingDown).
*/
bool _isShuttingDown() const;
bool _isShuttingDown_inlock() const;
@@ -527,7 +527,7 @@ private:
void _cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle);
/**
- * Starts up component and checks data replicator's shutdown state at the same time.
+ * Starts up component and checks initial syncer's shutdown state at the same time.
* If component's startup() fails, resets 'component' (which is assumed to be a unique_ptr
* to the component type).
*/
@@ -555,7 +555,7 @@ private:
// _mutex or be in a callback in _exec to read.
mutable stdx::mutex _mutex; // (S)
- const DataReplicatorOptions _opts; // (R)
+ const InitialSyncerOptions _opts; // (R)
std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (R)
executor::TaskExecutor* _exec; // (R)
StorageInterface* _storage; // (R)
@@ -597,7 +597,7 @@ private:
// Used to signal changes in _state.
mutable stdx::condition_variable _stateCondition;
- // Current data replicator state. See comments for State enum class for details.
+ // Current initial syncer state. See comments for State enum class for details.
State _state = State::kPreStart; // (M)
// Passed to CollectionCloner via DatabasesCloner.
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index f6eaae1fa99..bcb430b4d37 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -37,8 +37,8 @@
#include "mongo/db/json.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/repl/base_cloner_test_fixture.h"
-#include "mongo/db/repl/data_replicator.h"
#include "mongo/db/repl/data_replicator_external_state_mock.h"
+#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_fetcher.h"
@@ -67,17 +67,17 @@ namespace mongo {
namespace repl {
/**
- * Insertion operator for DataReplicator::State. Formats data replicator state for output stream.
+ * Insertion operator for InitialSyncer::State. Formats initial syncer state for output stream.
*/
-std::ostream& operator<<(std::ostream& os, const DataReplicator::State& state) {
+std::ostream& operator<<(std::ostream& os, const InitialSyncer::State& state) {
switch (state) {
- case DataReplicator::State::kPreStart:
+ case InitialSyncer::State::kPreStart:
return os << "PreStart";
- case DataReplicator::State::kRunning:
+ case InitialSyncer::State::kRunning:
return os << "Running";
- case DataReplicator::State::kShuttingDown:
+ case InitialSyncer::State::kShuttingDown:
return os << "ShuttingDown";
- case DataReplicator::State::kComplete:
+ case InitialSyncer::State::kComplete:
return os << "Complete";
}
MONGO_UNREACHABLE;
@@ -145,9 +145,9 @@ private:
ShouldFailRequestFn _shouldFailRequest;
};
-class DataReplicatorTest : public executor::ThreadPoolExecutorTest, public SyncSourceSelector {
+class InitialSyncerTest : public executor::ThreadPoolExecutorTest, public SyncSourceSelector {
public:
- DataReplicatorTest() {}
+ InitialSyncerTest() {}
executor::ThreadPoolMock::Options makeThreadPoolMockOptions() const override;
@@ -218,7 +218,7 @@ public:
}
/**
- * Schedules and processes a successful response to the network request sent by DataReplicator's
+ * Schedules and processes a successful response to the network request sent by InitialSyncer's
* last oplog entry fetcher. Also validates the find command arguments in the request.
*/
void processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs);
@@ -233,8 +233,8 @@ public:
ASSERT_FALSE(getNet()->hasReadyRequests());
}
- DataReplicator& getDR() {
- return *_dr;
+ InitialSyncer& getInitialSyncer() {
+ return *_initialSyncer;
}
DataReplicatorExternalStateMock* getExternalState() {
@@ -327,7 +327,7 @@ protected:
_myLastOpTime = OpTime({3, 0}, 1);
- DataReplicatorOptions options;
+ InitialSyncerOptions options;
options.initialSyncRetryWait = Milliseconds(1);
options.getMyLastOptime = [this]() { return _myLastOpTime; };
options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); };
@@ -372,17 +372,17 @@ protected:
};
try {
- // When creating DataReplicator, we wrap _onCompletion so that we can override the
- // DataReplicator's callback behavior post-construction.
- // See DataReplicatorTransitionsToCompleteWhenFinishCallbackThrowsException.
- _dr = stdx::make_unique<DataReplicator>(
+ // When creating InitialSyncer, we wrap _onCompletion so that we can override the
+ // InitialSyncer's callback behavior post-construction.
+ // See InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException.
+ _initialSyncer = stdx::make_unique<InitialSyncer>(
options,
std::move(dataReplicatorExternalState),
_storageInterface.get(),
[this](const StatusWith<OpTimeWithHash>& lastApplied) {
_onCompletion(lastApplied);
});
- _dr->setScheduleDbWorkFn_forTest(
+ _initialSyncer->setScheduleDbWorkFn_forTest(
[this](const executor::TaskExecutor::CallbackFn& work) {
return getExecutor().scheduleWork(work);
});
@@ -403,12 +403,12 @@ protected:
void tearDown() override {
tearDownExecutorThread();
- _dr.reset();
+ _initialSyncer.reset();
_dbWorkThreadPool->join();
_dbWorkThreadPool.reset();
_storageInterface.reset();
- // tearDown() destroys the task executor which was referenced by the data replicator.
+ // tearDown() destroys the task executor which was referenced by the initial syncer.
executor::ThreadPoolExecutorTest::tearDown();
}
@@ -432,8 +432,8 @@ protected:
TaskExecutorMock::ShouldFailRequestFn _shouldFailRequest;
std::unique_ptr<TaskExecutorMock> _executorProxy;
- DataReplicatorOptions _options;
- DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime;
+ InitialSyncerOptions _options;
+ InitialSyncerOptions::SetMyLastOptimeFn _setMyLastOptime;
OpTime _myLastOpTime;
std::unique_ptr<SyncSourceSelectorMock> _syncSourceSelector;
std::unique_ptr<StorageInterfaceMock> _storageInterface;
@@ -442,17 +442,17 @@ protected:
std::map<NamespaceString, CollectionCloneInfo> _collections;
StatusWith<OpTimeWithHash> _lastApplied = Status(ErrorCodes::NotYetInitialized, "");
- DataReplicator::OnCompletionFn _onCompletion;
+ InitialSyncer::OnCompletionFn _onCompletion;
private:
DataReplicatorExternalStateMock* _externalState;
- std::unique_ptr<DataReplicator> _dr;
+ std::unique_ptr<InitialSyncer> _initialSyncer;
bool _executorThreadShutdownComplete = false;
};
-executor::ThreadPoolMock::Options DataReplicatorTest::makeThreadPoolMockOptions() const {
+executor::ThreadPoolMock::Options InitialSyncerTest::makeThreadPoolMockOptions() const {
executor::ThreadPoolMock::Options options;
- options.onCreateThread = []() { Client::initThread("DataReplicatorTest"); };
+ options.onCreateThread = []() { Client::initThread("InitialSyncerTest"); };
return options;
}
@@ -534,7 +534,7 @@ BSONObj makeOplogEntry(int t, const char* opType = "i", int version = OplogEntry
<< BSON("_id" << t << "a" << t));
}
-void DataReplicatorTest::processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs) {
+void InitialSyncerTest::processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs) {
auto net = getNet();
auto request = assertRemoteCommandNameEquals(
"find",
@@ -546,8 +546,8 @@ void DataReplicatorTest::processSuccessfulLastOplogEntryFetcherResponse(std::vec
net->runReadyNetworkOperations();
}
-TEST_F(DataReplicatorTest, InvalidConstruction) {
- DataReplicatorOptions options;
+TEST_F(InitialSyncerTest, InvalidConstruction) {
+ InitialSyncerOptions options;
options.getMyLastOptime = []() { return OpTime(); };
options.setMyLastOptime = [](const OpTime&) {};
options.getSlaveDelay = []() { return Seconds(0); };
@@ -558,7 +558,7 @@ TEST_F(DataReplicatorTest, InvalidConstruction) {
{
auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
ASSERT_THROWS_CODE_AND_WHAT(
- DataReplicator(
+ InitialSyncer(
options, std::move(dataReplicatorExternalState), _storageInterface.get(), callback),
UserException,
ErrorCodes::BadValue,
@@ -569,112 +569,112 @@ TEST_F(DataReplicatorTest, InvalidConstruction) {
{
auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
dataReplicatorExternalState->taskExecutor = &getExecutor();
- ASSERT_THROWS_CODE_AND_WHAT(DataReplicator(options,
- std::move(dataReplicatorExternalState),
- _storageInterface.get(),
- DataReplicator::OnCompletionFn()),
+ ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
+ std::move(dataReplicatorExternalState),
+ _storageInterface.get(),
+ InitialSyncer::OnCompletionFn()),
UserException,
ErrorCodes::BadValue,
"callback function cannot be null");
}
}
-TEST_F(DataReplicatorTest, CreateDestroy) {}
+TEST_F(InitialSyncerTest, CreateDestroy) {}
const std::uint32_t maxAttempts = 1U;
-TEST_F(DataReplicatorTest, StartupReturnsIllegalOperationIfAlreadyActive) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, StartupReturnsIllegalOperationIfAlreadyActive) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
- ASSERT_FALSE(dr->isActive());
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(dr->isActive());
- ASSERT_EQUALS(ErrorCodes::IllegalOperation, dr->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(dr->isActive());
+ ASSERT_FALSE(initialSyncer->isActive());
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
+ ASSERT_TRUE(initialSyncer->isActive());
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, initialSyncer->startup(opCtx.get(), maxAttempts));
+ ASSERT_TRUE(initialSyncer->isActive());
}
-TEST_F(DataReplicatorTest, StartupReturnsShutdownInProgressIfDataReplicatorIsShuttingDown) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, StartupReturnsShutdownInProgressIfInitialSyncerIsShuttingDown) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
- ASSERT_FALSE(dr->isActive());
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(dr->isActive());
- // SyncSourceSelector returns an invalid sync source so DataReplicator is stuck waiting for
+ ASSERT_FALSE(initialSyncer->isActive());
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
+ ASSERT_TRUE(initialSyncer->isActive());
+ // SyncSourceSelector returns an invalid sync source so InitialSyncer is stuck waiting for
// another sync source in 'Options::syncSourceRetryWait' ms.
- ASSERT_OK(dr->shutdown());
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->shutdown());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
}
-TEST_F(DataReplicatorTest, StartupReturnsShutdownInProgressIfExecutorIsShutdown) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, StartupReturnsShutdownInProgressIfExecutorIsShutdown) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
getExecutor().shutdown();
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(opCtx.get(), maxAttempts));
- ASSERT_FALSE(dr->isActive());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
+ ASSERT_FALSE(initialSyncer->isActive());
- // Cannot startup data replicator again since it's in the Complete state.
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(opCtx.get(), maxAttempts));
+ // Cannot startup initial syncer again since it's in the Complete state.
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
}
-TEST_F(DataReplicatorTest, ShutdownTransitionsStateToCompleteIfCalledBeforeStartup) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, ShutdownTransitionsStateToCompleteIfCalledBeforeStartup) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
- ASSERT_OK(dr->shutdown());
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, dr->startup(opCtx.get(), maxAttempts));
- // Data replicator is inactive when it's in the Complete state.
- ASSERT_FALSE(dr->isActive());
+ ASSERT_OK(initialSyncer->shutdown());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
+ // Initial syncer is inactive when it's in the Complete state.
+ ASSERT_FALSE(initialSyncer->isActive());
}
-TEST_F(DataReplicatorTest, StartupSetsInitialSyncFlagOnSuccess) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, StartupSetsInitialSyncFlagOnSuccess) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Initial sync flag should not be set before starting.
ASSERT_FALSE(getStorage().getInitialSyncFlag(opCtx.get()));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(dr->isActive());
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
+ ASSERT_TRUE(initialSyncer->isActive());
// Initial sync flag should be set.
ASSERT_TRUE(getStorage().getInitialSyncFlag(opCtx.get()));
}
-TEST_F(DataReplicatorTest, DataReplicatorReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
// This will cancel the _startInitialSyncAttemptCallback() task scheduled by startup().
- ASSERT_OK(dr->shutdown());
+ ASSERT_OK(initialSyncer->shutdown());
- // Depending on which DataReplicator stage (_chooseSyncSource or _rollbackCheckerResetCallback)
+ // Depending on which InitialSyncer stage (_chooseSyncSource or _rollbackCheckerResetCallback)
// was interrupted by shutdown(), we may have to request the network interface to deliver
- // cancellation signals to the DataReplicator callbacks in for DataReplicator to run to
+ // cancellation signals to the InitialSyncer callbacks in for InitialSyncer to run to
// completion.
executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorRetriesSyncSourceSelectionIfChooseNewSyncSourceReturnsInvalidSyncSource) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerRetriesSyncSourceSelectionIfChooseNewSyncSourceReturnsInvalidSyncSource) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup()
- // because DataReplicator will look for a valid sync source immediately after startup.
+ // because InitialSyncer will look for a valid sync source immediately after startup.
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
// Run first sync source selection attempt.
executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
- // DataReplicator will not drop user databases while looking for a valid sync source.
+ // InitialSyncer will not drop user databases while looking for a valid sync source.
ASSERT_FALSE(_storageInterfaceWorkDone.droppedUserDBs);
// First sync source selection attempt failed. Update SyncSourceSelectorMock to return valid
@@ -691,9 +691,9 @@ TEST_F(DataReplicatorTest,
const std::uint32_t chooseSyncSourceMaxAttempts = 10U;
/**
- * Advances executor clock so that DataReplicator exhausts all 'chooseSyncSourceMaxAttempts' (server
+ * Advances executor clock so that InitialSyncer exhausts all 'chooseSyncSourceMaxAttempts' (server
* parameter numInitialSyncConnectAttempts) sync source selection attempts.
- * If SyncSourceSelectorMock keeps returning an invalid sync source, DataReplicator will retry every
+ * If SyncSourceSelectorMock keeps returning an invalid sync source, InitialSyncer will retry every
* '_options.syncSourceRetryWait' ms up to a maximum of 'chooseSyncSourceMaxAttempts' attempts.
*/
void _simulateChooseSyncSourceFailure(executor::NetworkInterfaceMock* net,
@@ -702,36 +702,36 @@ void _simulateChooseSyncSourceFailure(executor::NetworkInterfaceMock* net,
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorReturnsInitialSyncOplogSourceMissingIfNoValidSyncSourceCanBeFoundAfterTenFailedChooseSyncSourceAttempts) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerReturnsInitialSyncOplogSourceMissingIfNoValidSyncSourceCanBeFoundAfterTenFailedChooseSyncSourceAttempts) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup()
- // because DataReplicator will look for a valid sync source immediately after startup.
+ // because InitialSyncer will look for a valid sync source immediately after startup.
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
_simulateChooseSyncSourceFailure(getNet(), _options.syncSourceRetryWait);
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
}
-// Confirms that DataReplicator keeps retrying initial sync.
+// Confirms that InitialSyncer keeps retrying initial sync.
// Make every initial sync attempt fail early by having the sync source selector always return an
// invalid sync source.
-TEST_F(DataReplicatorTest,
- DataReplicatorRetriesInitialSyncUpToMaxAttemptsAndReturnsLastAttemptError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerRetriesInitialSyncUpToMaxAttemptsAndReturnsLastAttemptError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
const std::uint32_t initialSyncMaxAttempts = 3U;
- ASSERT_OK(dr->startup(opCtx.get(), initialSyncMaxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
auto net = getNet();
for (std::uint32_t i = 0; i < initialSyncMaxAttempts; ++i) {
@@ -739,12 +739,12 @@ TEST_F(DataReplicatorTest,
advanceClock(net, _options.initialSyncRetryWait);
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
// Check number of failed attempts in stats.
- auto progress = dr->getInitialSyncProgress();
+ auto progress = initialSyncer->getInitialSyncProgress();
unittest::log() << "Progress after " << initialSyncMaxAttempts
<< " failed attempts: " << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), int(initialSyncMaxAttempts))
@@ -753,13 +753,13 @@ TEST_F(DataReplicatorTest,
<< progress;
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsCallbackCanceledIfShutdownWhileRetryingSyncSourceSelection) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsCallbackCanceledIfShutdownWhileRetryingSyncSourceSelection) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -771,39 +771,38 @@ TEST_F(DataReplicatorTest,
// This will cancel the _chooseSyncSourceCallback() task scheduled at getNet()->now() +
// '_options.syncSourceRetryWait'.
- ASSERT_OK(dr->shutdown());
+ ASSERT_OK(initialSyncer->shutdown());
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(
- DataReplicatorTest,
- DataReplicatorReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextChooseSyncSourceCallback) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextChooseSyncSourceCallback) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
_executorProxy->shouldFailScheduleWorkAt = true;
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextInitialSyncAttempt) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextInitialSyncAttempt) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
- ASSERT_EQUALS(DataReplicator::State::kPreStart, dr->getState_forTest());
+ ASSERT_EQUALS(InitialSyncer::State::kPreStart, initialSyncer->getState_forTest());
- ASSERT_OK(dr->startup(opCtx.get(), 2U));
- ASSERT_EQUALS(DataReplicator::State::kRunning, dr->getState_forTest());
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
+ ASSERT_EQUALS(InitialSyncer::State::kRunning, initialSyncer->getState_forTest());
// Advance clock so that we run all but the last sync source callback.
auto net = getNet();
@@ -814,16 +813,16 @@ TEST_F(DataReplicatorTest,
_executorProxy->shouldFailScheduleWorkAt = true;
advanceClock(net, _options.syncSourceRetryWait);
- dr->join();
+ initialSyncer->join();
- ASSERT_EQUALS(DataReplicator::State::kComplete, dr->getState_forTest());
+ ASSERT_EQUALS(InitialSyncer::State::kComplete, initialSyncer->getState_forTest());
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-// This test verifies that the data replication will still transition to a complete state even if
+// This test verifies that the initial syncer will still transition to a complete state even if
// the completion callback function throws an exception.
-TEST_F(DataReplicatorTest, DataReplicatorTransitionsToCompleteWhenFinishCallbackThrowsException) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) {
@@ -832,10 +831,10 @@ TEST_F(DataReplicatorTest, DataReplicatorTransitionsToCompleteWhenFinishCallback
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- ASSERT_OK(dr->shutdown());
- dr->join();
+ ASSERT_OK(initialSyncer->shutdown());
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
@@ -854,14 +853,14 @@ private:
bool* _sharedCallbackStateDestroyed;
};
-TEST_F(DataReplicatorTest, DataReplicatorResetsOnCompletionCallbackFunctionPointerUponCompletion) {
+TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointerUponCompletion) {
bool sharedCallbackStateDestroyed = false;
auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed);
decltype(_lastApplied) lastApplied = getDetectableErrorStatus();
auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
dataReplicatorExternalState->taskExecutor = &getExecutor();
- auto dr = stdx::make_unique<DataReplicator>(
+ auto initialSyncer = stdx::make_unique<InitialSyncer>(
_options,
std::move(dataReplicatorExternalState),
_storageInterface.get(),
@@ -872,30 +871,30 @@ TEST_F(DataReplicatorTest, DataReplicatorResetsOnCompletionCallbackFunctionPoint
auto opCtx = makeOpCtx();
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
sharedCallbackData.reset();
ASSERT_FALSE(sharedCallbackStateDestroyed);
- ASSERT_OK(dr->shutdown());
+ ASSERT_OK(initialSyncer->shutdown());
- // Depending on which DataReplicator stage (_chooseSyncSource or _rollbackCheckerResetCallback)
+ // Depending on which InitialSyncer stage (_chooseSyncSource or _rollbackCheckerResetCallback)
// was interrupted by shutdown(), we may have to request the network interface to deliver
- // cancellation signals to the DataReplicator callbacks in for DataReplicator to run to
+ // cancellation signals to the InitialSyncer callbacks in for InitialSyncer to run to
// completion.
executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, lastApplied);
- // DataReplicator should reset 'DataReplicator::_onCompletion' after running callback function
+ // InitialSyncer should reset 'InitialSyncer::_onCompletion' after running callback function
// for the last time before becoming inactive.
- // This ensures that we release resources associated with 'DataReplicator::_onCompletion'.
+ // This ensures that we release resources associated with 'InitialSyncer::_onCompletion'.
ASSERT_TRUE(sharedCallbackStateDestroyed);
}
-TEST_F(DataReplicatorTest, DataReplicatorRecreatesOplogAndDropsReplicatedDatabases) {
+TEST_F(InitialSyncerTest, InitialSyncerRecreatesOplogAndDropsReplicatedDatabases) {
// We are not interested in proceeding beyond the oplog creation stage so we inject a failure
// after setting '_storageInterfaceWorkDone.createOplogCalled' to true.
auto oldCreateOplogFn = _storageInterface->createOplogFn;
@@ -905,13 +904,13 @@ TEST_F(DataReplicatorTest, DataReplicatorRecreatesOplogAndDropsReplicatedDatabas
return Status(ErrorCodes::OperationFailed, "oplog creation failed");
};
- auto dr = &getDR();
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
LockGuard lock(_storageInterfaceWorkDoneMutex);
@@ -919,11 +918,11 @@ TEST_F(DataReplicatorTest, DataReplicatorRecreatesOplogAndDropsReplicatedDatabas
ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetRollbackIdScheduleError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
- // replSetGetRBID is the first remote command to be scheduled by the data replicator after
+ // replSetGetRBID is the first remote command to be scheduled by the initial syncer after
// creating the oplog collection.
executor::RemoteCommandRequest request;
_shouldFailRequest = [&request](const executor::RemoteCommandRequest& requestToSend) {
@@ -933,9 +932,9 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetRollbackIdScheduleError
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS("admin", request.dbname);
@@ -944,8 +943,8 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetRollbackIdScheduleError
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) {
+ InitialSyncerTest,
+ InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) {
// The rollback id request is sent immediately after oplog creation. We shut the task executor
// down before returning from createOplog() to make the scheduleRemoteCommand() call for
// replSetGetRBID fail.
@@ -957,30 +956,30 @@ TEST_F(
return status;
};
- auto dr = &getDR();
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied);
LockGuard lock(_storageInterfaceWorkDoneMutex);
ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
}
-TEST_F(DataReplicatorTest, DataReplicatorCancelsRollbackCheckerOnShutdown) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerCancelsRollbackCheckerOnShutdown) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
- ASSERT_EQUALS(DataReplicator::State::kPreStart, dr->getState_forTest());
+ ASSERT_EQUALS(InitialSyncer::State::kPreStart, initialSyncer->getState_forTest());
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
- ASSERT_EQUALS(DataReplicator::State::kRunning, dr->getState_forTest());
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
+ ASSERT_EQUALS(InitialSyncer::State::kRunning, initialSyncer->getState_forTest());
auto net = getNet();
{
@@ -993,25 +992,25 @@ TEST_F(DataReplicatorTest, DataReplicatorCancelsRollbackCheckerOnShutdown) {
net->blackHole(noi);
}
- ASSERT_OK(dr->shutdown());
+ ASSERT_OK(initialSyncer->shutdown());
// Since we need to request the NetworkInterfaceMock to deliver the cancellation event,
- // the DataReplicator has to be in a pre-completion state (ie. ShuttingDown).
- ASSERT_EQUALS(DataReplicator::State::kShuttingDown, dr->getState_forTest());
+ // the InitialSyncer has to be in a pre-completion state (ie. ShuttingDown).
+ ASSERT_EQUALS(InitialSyncer::State::kShuttingDown, initialSyncer->getState_forTest());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
- dr->join();
- ASSERT_EQUALS(DataReplicator::State::kComplete, dr->getState_forTest());
+ initialSyncer->join();
+ ASSERT_EQUALS(InitialSyncer::State::kComplete, initialSyncer->getState_forTest());
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughRollbackCheckerCallbackError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughRollbackCheckerCallbackError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1023,12 +1022,12 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughRollbackCheckerCallbackErr
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherScheduleError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherScheduleError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// The last oplog entry fetcher is the first component that sends a find command so we reject
@@ -1041,7 +1040,7 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherSched
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1052,7 +1051,7 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherSched
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(syncSource, request.target);
@@ -1062,12 +1061,12 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherSched
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherCallbackError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherCallbackError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1084,16 +1083,16 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughLastOplogEntryFetcherCallb
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorCancelsLastOplogEntryFetcherOnShutdown) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerCancelsLastOplogEntryFetcherOnShutdown) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1106,20 +1105,20 @@ TEST_F(DataReplicatorTest, DataReplicatorCancelsLastOplogEntryFetcherOnShutdown)
ASSERT_TRUE(net->hasReadyRequests());
}
- ASSERT_OK(dr->shutdown());
+ ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsNoMatchingDocumentIfLastOplogEntryFetcherReturnsEmptyBatchOfDocuments) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsNoMatchingDocumentIfLastOplogEntryFetcherReturnsEmptyBatchOfDocuments) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1133,17 +1132,17 @@ TEST_F(DataReplicatorTest,
processSuccessfulLastOplogEntryFetcherResponse({});
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorResendsFindCommandIfLastOplogEntryFetcherReturnsRetriableError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerResendsFindCommandIfLastOplogEntryFetcherReturnsRetriableError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -1157,20 +1156,20 @@ TEST_F(DataReplicatorTest,
net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, "")));
net->runReadyNetworkOperations();
- // DataReplicator stays active because it resends the find request for the last oplog entry.
- ASSERT_TRUE(dr->isActive());
+ // InitialSyncer stays active because it resends the find request for the last oplog entry.
+ ASSERT_TRUE(initialSyncer->isActive());
// Last oplog entry second attempt.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1184,17 +1183,17 @@ TEST_F(DataReplicatorTest,
processSuccessfulLastOplogEntryFetcherResponse({BSONObj()});
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1208,19 +1207,19 @@ TEST_F(DataReplicatorTest,
processSuccessfulLastOplogEntryFetcherResponse({BSON("h" << 1LL)});
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorPassesThroughErrorFromDataReplicatorExternalStateGetCurrentConfig) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerPassesThroughErrorFromDataReplicatorExternalStateGetCurrentConfig) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
getExternalState()->replSetConfigResult = Status(ErrorCodes::OperationFailed, "");
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1234,12 +1233,12 @@ TEST_F(DataReplicatorTest,
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherScheduleError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherScheduleError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Make the tailable oplog query fail. Allow all other requests to be scheduled.
@@ -1255,7 +1254,7 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherScheduleError)
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1269,7 +1268,7 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherScheduleError)
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(syncSource, request.target);
@@ -1279,12 +1278,12 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherScheduleError)
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherCallbackError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1312,17 +1311,17 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughOplogFetcherCallbackError)
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1362,19 +1361,19 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(OplogEntry(makeOplogEntry(1)).getOpTime(),
unittest::assertGet(_lastApplied).opTime);
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1411,19 +1410,19 @@ TEST_F(
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(OplogEntry(makeOplogEntry(3)).getOpTime(),
unittest::assertGet(_lastApplied).opTime);
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorReturnsRemoteResultsUnavailableOnEarlyOplogFetcherCompletionIfThereAreNotEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerReturnsRemoteResultsUnavailableOnEarlyOplogFetcherCompletionIfThereAreNotEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1457,13 +1456,13 @@ TEST_F(
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(4)});
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::RemoteResultsUnavailable, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorPassesThroughDatabasesClonerScheduleErrorAndCancelsOplogFetcher) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerPassesThroughDatabasesClonerScheduleErrorAndCancelsOplogFetcher) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Make the listDatabases command fail. Allow all other requests to be scheduled.
@@ -1478,7 +1477,7 @@ TEST_F(DataReplicatorTest,
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1491,17 +1490,17 @@ TEST_F(DataReplicatorTest,
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
- // DataReplicator shuts down OplogFetcher when it fails to schedule DatabasesCloner
+ // InitialSyncer shuts down OplogFetcher when it fails to schedule DatabasesCloner
// so we should not expect any network requests in the queue.
ASSERT_FALSE(net->hasReadyRequests());
// OplogFetcher is shutting down but we still need to call runReadyNetworkOperations()
- // to deliver the cancellation status to the 'DataReplicator::_oplogFetcherCallback'
+ // to deliver the cancellation status to the 'InitialSyncer::_oplogFetcherCallback'
// callback.
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(syncSource, request.target);
@@ -1509,13 +1508,13 @@ TEST_F(DataReplicatorTest,
assertRemoteCommandNameEquals("listDatabases", request);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorPassesThroughDatabasesClonerCallbackErrorAndCancelsOplogFetcher) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerPassesThroughDatabasesClonerCallbackErrorAndCancelsOplogFetcher) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1546,16 +1545,16 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::FailedToParse, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorIgnoresLocalDatabasesWhenCloningDatabases) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1605,17 +1604,17 @@ TEST_F(DataReplicatorTest, DataReplicatorIgnoresLocalDatabasesWhenCloningDatabas
getExecutor().shutdown();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorIgnoresDatabaseInfoDocumentWithoutNameFieldWhenCloningDatabases) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerIgnoresDatabaseInfoDocumentWithoutNameFieldWhenCloningDatabases) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1672,16 +1671,16 @@ TEST_F(DataReplicatorTest,
getExecutor().shutdown();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorCancelsBothOplogFetcherAndDatabasesClonerOnShutdown) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerCancelsBothOplogFetcherAndDatabasesClonerOnShutdown) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1695,16 +1694,16 @@ TEST_F(DataReplicatorTest, DataReplicatorCancelsBothOplogFetcherAndDatabasesClon
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
}
- ASSERT_OK(dr->shutdown());
+ ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorPassesThroughSecondLastOplogEntryFetcherScheduleErrorAndCancelsOplogFetcher) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerPassesThroughSecondLastOplogEntryFetcherScheduleErrorAndCancelsOplogFetcher) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Make the second last oplog entry fetcher command fail. Allow all other requests to be
@@ -1726,7 +1725,7 @@ TEST_F(DataReplicatorTest,
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1759,17 +1758,17 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorPassesThroughSecondLastOplogEntryFetcherCallbackErrorAndCancelsOplogFetcher) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerPassesThroughSecondLastOplogEntryFetcherCallbackErrorAndCancelsOplogFetcher) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1811,17 +1810,17 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorCancelsBothSecondLastOplogEntryFetcherAndOplogFetcherOnShutdown) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerCancelsBothSecondLastOplogEntryFetcherAndOplogFetcherOnShutdown) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1855,20 +1854,20 @@ TEST_F(DataReplicatorTest,
net->blackHole(noi);
}
- dr->shutdown();
+ initialSyncer->shutdown();
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorCancelsSecondLastOplogEntryFetcherOnOplogFetcherCallbackError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerCancelsSecondLastOplogEntryFetcherOnOplogFetcherCallbackError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -1914,18 +1913,18 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorReturnsTypeMismatchErrorWhenSecondLastOplogEntryFetcherReturnsMalformedDocument) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerReturnsTypeMismatchErrorWhenSecondLastOplogEntryFetcherReturnsMalformedDocument) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
@@ -1964,17 +1963,17 @@ TEST_F(
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::TypeMismatch, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsOplogOutOfOrderIfStopTimestampPrecedesBeginTimestamp) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsOplogOutOfOrderIfStopTimestampPrecedesBeginTimestamp) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
@@ -2010,14 +2009,14 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorPassesThroughInsertOplogSeedDocumentErrorAfterDataCloningFinishesWithNoOperationsToApply) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerPassesThroughInsertOplogSeedDocumentErrorAfterDataCloningFinishesWithNoOperationsToApply) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
NamespaceString insertDocumentNss;
@@ -2030,7 +2029,7 @@ TEST_F(
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
@@ -2067,30 +2066,30 @@ TEST_F(
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc);
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorReturnsCallbackCanceledAndDoesNotScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerReturnsCallbackCanceledAndDoesNotScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
NamespaceString insertDocumentNss;
BSONObj insertDocumentDoc;
- _storageInterface->insertDocumentFn = [dr, &insertDocumentDoc, &insertDocumentNss](
+ _storageInterface->insertDocumentFn = [initialSyncer, &insertDocumentDoc, &insertDocumentNss](
OperationContext*, const NamespaceString& nss, const BSONObj& doc) {
insertDocumentNss = nss;
insertDocumentDoc = doc;
- dr->shutdown();
+ initialSyncer->shutdown();
return Status::OK();
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
@@ -2127,16 +2126,16 @@ TEST_F(
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc);
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorPassesThroughRollbackCheckerScheduleErrorAfterCloningFinishesWithNoOperationsToApply) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerPassesThroughRollbackCheckerScheduleErrorAfterCloningFinishesWithNoOperationsToApply) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Make the second replSetGetRBID command fail. Allow all other requests to be scheduled.
@@ -2155,7 +2154,7 @@ TEST_F(
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
@@ -2192,18 +2191,18 @@ TEST_F(
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorPassesThroughRollbackCheckerCallbackErrorAfterCloningFinishesWithNoOperationsToApply) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerPassesThroughRollbackCheckerCallbackErrorAfterCloningFinishesWithNoOperationsToApply) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
@@ -2247,16 +2246,16 @@ TEST_F(
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorCancelsLastRollbackCheckerOnShutdown) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnShutdown) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
@@ -2298,19 +2297,19 @@ TEST_F(DataReplicatorTest, DataReplicatorCancelsLastRollbackCheckerOnShutdown) {
net->runReadyNetworkOperations();
}
- ASSERT_OK(dr->shutdown());
+ ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorCancelsLastRollbackCheckerOnOplogFetcherCallbackError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherCallbackError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
@@ -2358,17 +2357,17 @@ TEST_F(DataReplicatorTest, DataReplicatorCancelsLastRollbackCheckerOnOplogFetche
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsUnrecoverableRollbackErrorIfSyncSourceRolledBackAfterCloningData) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsUnrecoverableRollbackErrorIfSyncSourceRolledBackAfterCloningData) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
@@ -2407,16 +2406,16 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, _lastApplied);
}
-TEST_F(DataReplicatorTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
@@ -2493,18 +2492,18 @@ TEST_F(DataReplicatorTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfte
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(OplogEntry(oplogEntry).getOpTime(), unittest::assertGet(_lastApplied).opTime);
ASSERT_EQUALS(oplogEntry["h"].Long(), unittest::assertGet(_lastApplied).value);
ASSERT_FALSE(_storageInterface->getInitialSyncFlag(opCtx.get()));
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetNextApplierBatchScheduleError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
@@ -2535,7 +2534,7 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetNextApplierBatchSchedul
net->runReadyNetworkOperations();
// Before processing scheduled last oplog entry fetcher response, set flag in
- // TaskExecutorMock so that DataReplicator will fail to schedule
+ // TaskExecutorMock so that InitialSyncer will fail to schedule
// _getNextApplierBatchCallback().
_executorProxy->shouldFailScheduleWork = true;
@@ -2549,16 +2548,16 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetNextApplierBatchSchedul
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughSecondGetNextApplierBatchScheduleError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchScheduleError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
@@ -2589,7 +2588,7 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughSecondGetNextApplierBatchS
net->runReadyNetworkOperations();
// Before processing scheduled last oplog entry fetcher response, set flag in
- // TaskExecutorMock so that DataReplicator will fail to schedule second
+ // TaskExecutorMock so that InitialSyncer will fail to schedule second
// _getNextApplierBatchCallback() at (now + options.getApplierBatchCallbackRetryWait).
_executorProxy->shouldFailScheduleWorkAt = true;
@@ -2603,16 +2602,16 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughSecondGetNextApplierBatchS
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorCancelsGetNextApplierBatchOnShutdown) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
@@ -2650,19 +2649,19 @@ TEST_F(DataReplicatorTest, DataReplicatorCancelsGetNextApplierBatchOnShutdown) {
// rescheduling itself at new->now() + _options.getApplierBatchCallbackRetryWait.
}
- ASSERT_OK(dr->shutdown());
+ ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetNextApplierBatchInLockError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
@@ -2715,18 +2714,18 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughGetNextApplierBatchInLockE
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::BadValue, _lastApplied);
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorReturnsEmptyBatchFromGetNextApplierBatchInLockIfRsSyncApplyStopFailPointIsEnabled) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerReturnsEmptyBatchFromGetNextApplierBatchInLockIfRsSyncApplyStopFailPointIsEnabled) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
@@ -2778,7 +2777,7 @@ TEST_F(
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
- // Since the 'rsSyncApplyStop' fail point is enabled, DataReplicator will get an empty
+ // Since the 'rsSyncApplyStop' fail point is enabled, InitialSyncer will get an empty
// batch of operations from _getNextApplierBatch_inlock() even though the oplog buffer
// is not empty.
}
@@ -2788,20 +2787,20 @@ TEST_F(
// with CallbackCanceled.
// Otherwise, shutdown() will cancel both the OplogFetcher and the scheduled
// _getNextApplierBatchCallback() task. The final initial sync status will be CallbackCanceled.
- ASSERT_OK(dr->shutdown());
+ ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsNoSuchKeyIfApplierBatchContainsAnOplogEntryWithoutHash) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsNoSuchKeyIfApplierBatchContainsAnOplogEntryWithoutHash) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
@@ -2850,16 +2849,16 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierScheduleError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
@@ -2918,12 +2917,12 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierScheduleError)
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierCallbackError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
getExternalState()->multiApplyFn =
@@ -2931,7 +2930,7 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierCallbackError)
return Status(ErrorCodes::OperationFailed, "multiApply failed");
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
int baseRollbackId = 1;
@@ -2973,16 +2972,16 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierCallbackError)
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest, DataReplicatorCancelsGetNextApplierBatchCallbackOnOplogFetcherError) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplogFetcherError) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
int baseRollbackId = 1;
@@ -3023,17 +3022,17 @@ TEST_F(DataReplicatorTest, DataReplicatorCancelsGetNextApplierBatchCallbackOnOpl
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto lastOp = makeOplogEntry(2);
@@ -3084,20 +3083,20 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime);
ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value);
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- // To make DataReplicator apply multiple batches, we make the third and last operation a command
+ // To make InitialSyncer apply multiple batches, we make the third and last operation a command
// so that it will go into a separate batch from the second operation. First operation is the
// last fetched entry before data cloning and is not applied.
auto lastOp = makeOplogEntry(3, "c");
@@ -3181,20 +3180,20 @@ TEST_F(DataReplicatorTest,
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime);
ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value);
}
TEST_F(
- DataReplicatorTest,
- DataReplicatorSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) {
- auto dr = &getDR();
+ InitialSyncerTest,
+ InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Override DataReplicatorExternalState::_multiInitialSyncApply() so that it will also fetch a
// missing document.
- // This forces DataReplicator to evaluate its end timestamp for applying operations after each
+ // This forces InitialSyncer to evaluate its end timestamp for applying operations after each
// batch.
getExternalState()->multiApplyFn = [](OperationContext*,
const MultiApplier::Operations& ops,
@@ -3214,7 +3213,7 @@ TEST_F(
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
// Use command for third and last operation to ensure we have two batches to apply.
auto lastOp = makeOplogEntry(3, "c");
@@ -3252,7 +3251,7 @@ TEST_F(
net->blackHole(noi);
// Second last oplog entry fetcher.
- // Send oplog entry with timestamp 2. DataReplicator will update this end timestamp after
+ // Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
// applying the first batch.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
@@ -3271,20 +3270,20 @@ TEST_F(
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime);
ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value);
ASSERT_TRUE(fetchCountIncremented);
- auto progress = dr->getInitialSyncProgress();
+ auto progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after failed initial sync attempt: " << progress;
ASSERT_EQUALS(1, progress.getIntField("fetchedMissingDocs")) << progress;
}
-TEST_F(DataReplicatorTest,
- DataReplicatorReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest,
+ InitialSyncerReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// This fail point makes chooseSyncSourceCallback fail with an InvalidSyncSource error.
@@ -3293,18 +3292,18 @@ TEST_F(DataReplicatorTest,
ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, _lastApplied);
}
-TEST_F(DataReplicatorTest, OplogOutOfOrderOnOplogFetchFinish) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(dr->startup(opCtx.get(), maxAttempts));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
int baseRollbackId = 1;
@@ -3344,21 +3343,21 @@ TEST_F(DataReplicatorTest, OplogOutOfOrderOnOplogFetchFinish) {
net->runReadyNetworkOperations();
}
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
}
-TEST_F(DataReplicatorTest, GetInitialSyncProgressReturnsCorrectProgress) {
- auto dr = &getDR();
+TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
+ auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017));
- ASSERT_OK(dr->startup(opCtx.get(), 2U));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
auto net = getNet();
int baseRollbackId = 1;
- // Play first 2 responses to ensure data replicator has started the oplog fetcher.
+ // Play first 2 responses to ensure initial syncer has started the oplog fetcher.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -3372,7 +3371,7 @@ TEST_F(DataReplicatorTest, GetInitialSyncProgressReturnsCorrectProgress) {
log() << "Done playing first failed response";
- auto progress = dr->getInitialSyncProgress();
+ auto progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after first failed response: " << progress;
ASSERT_EQUALS(progress.nFields(), 8) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress;
@@ -3407,7 +3406,7 @@ TEST_F(DataReplicatorTest, GetInitialSyncProgressReturnsCorrectProgress) {
log() << "Done playing failed responses";
// Play the first 2 responses of the successful round of responses to ensure that the
- // data replicator starts the oplog fetcher.
+ // initial syncer starts the oplog fetcher.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -3425,7 +3424,7 @@ TEST_F(DataReplicatorTest, GetInitialSyncProgressReturnsCorrectProgress) {
log() << "Done playing first successful response";
- progress = dr->getInitialSyncProgress();
+ progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after failure: " << progress;
ASSERT_EQUALS(progress.nFields(), 8) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
@@ -3513,13 +3512,13 @@ TEST_F(DataReplicatorTest, GetInitialSyncProgressReturnsCorrectProgress) {
}
// Second last oplog entry fetcher.
- // Send oplog entry with timestamp 2. DataReplicator will update this end timestamp after
+ // Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
// applying the first batch.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(7)});
}
log() << "Done playing all but last successful response";
- progress = dr->getInitialSyncProgress();
+ progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after all but last successful response: " << progress;
ASSERT_EQUALS(progress.nFields(), 9) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
@@ -3576,11 +3575,11 @@ TEST_F(DataReplicatorTest, GetInitialSyncProgressReturnsCorrectProgress) {
}
log() << "waiting for initial sync to verify it completed OK";
- dr->join();
+ initialSyncer->join();
ASSERT_EQUALS(OplogEntry(makeOplogEntry(7)).getOpTime(),
unittest::assertGet(_lastApplied).opTime);
- progress = dr->getInitialSyncProgress();
+ progress = initialSyncer->getInitialSyncProgress();
log() << "Progress at end: " << progress;
ASSERT_EQUALS(progress.nFields(), 11) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h
index f177808a991..ae48ec2531d 100644
--- a/src/mongo/db/repl/oplog_buffer.h
+++ b/src/mongo/db/repl/oplog_buffer.h
@@ -44,7 +44,7 @@ namespace repl {
/**
* Interface for temporary container of oplog entries (in BSON format) from sync source by
- * OplogFetcher that will be read by applier in the DataReplicator.
+ * OplogFetcher that will be read by applier in the InitialSyncer.
*
* Implementations are only required to support one pusher and one popper.
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 05f07b8aada..e66ff985bb9 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -85,7 +85,7 @@ public:
/**
* Starts an initial sync, and calls "finished" when done,
- * for replica set member -- legacy impl not in DataReplicator.
+ * for replica set member.
*
* NOTE: Use either this (and below function) or the Master/Slave version, but not both.
*/
@@ -97,7 +97,7 @@ public:
virtual bool isInitialSyncFlagSet(OperationContext* opCtx) = 0;
/**
- * Starts steady state sync for replica set member -- legacy impl not in DataReplicator.
+ * Starts steady state sync for replica set member.
*
* NOTE: Use either this or the Master/Slave version, but not both.
*/
@@ -343,7 +343,7 @@ public:
OperationContext* opCtx) const = 0;
/**
- * Returns true if the user specified to use the data replicator for initial sync.
+ * Returns true if the user specified to use the new version for initial sync.
*/
virtual bool shouldUseDataReplicatorInitialSync() const = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 3600a08ea1a..18a25b08f26 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -283,9 +283,9 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings&
return ReplicationCoordinator::modeNone;
}
-DataReplicatorOptions createDataReplicatorOptions(
+InitialSyncerOptions createInitialSyncerOptions(
ReplicationCoordinator* replCoord, ReplicationCoordinatorExternalState* externalState) {
- DataReplicatorOptions options;
+ InitialSyncerOptions options;
options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); };
options.setMyLastOptime = [replCoord, externalState](const OpTime& opTime) {
replCoord->setMyLastAppliedOpTime(opTime);
@@ -556,19 +556,19 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
}
void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* opCtx) {
- std::shared_ptr<DataReplicator> drCopy;
+ std::shared_ptr<InitialSyncer> initialSyncerCopy;
{
LockGuard lk(_mutex);
- _dr.swap(drCopy);
+ _initialSyncer.swap(initialSyncerCopy);
}
- if (drCopy) {
+ if (initialSyncerCopy) {
LOG(1)
- << "ReplicationCoordinatorImpl::_stopDataReplication calling DataReplicator::shutdown.";
- const auto status = drCopy->shutdown();
+ << "ReplicationCoordinatorImpl::_stopDataReplication calling InitialSyncer::shutdown.";
+ const auto status = initialSyncerCopy->shutdown();
if (!status.isOK()) {
- warning() << "DataReplicator shutdown failed: " << status;
+ warning() << "InitialSyncer shutdown failed: " << status;
}
- drCopy.reset();
+ initialSyncerCopy.reset();
// Do not return here, fall through.
}
LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling "
@@ -634,22 +634,22 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
_externalState->startSteadyStateReplication(opCtx.get(), this);
};
- std::shared_ptr<DataReplicator> drCopy;
+ std::shared_ptr<InitialSyncer> initialSyncerCopy;
try {
{
- // Must take the lock to set _dr, but not call it.
+ // Must take the lock to set _initialSyncer, but not call it.
stdx::lock_guard<stdx::mutex> lock(_mutex);
- drCopy = std::make_shared<DataReplicator>(
- createDataReplicatorOptions(this, _externalState.get()),
+ initialSyncerCopy = std::make_shared<InitialSyncer>(
+ createInitialSyncerOptions(this, _externalState.get()),
stdx::make_unique<DataReplicatorExternalStateInitialSync>(this,
_externalState.get()),
_storage,
onCompletion);
- _dr = drCopy;
+ _initialSyncer = initialSyncerCopy;
}
- // DataReplicator::startup() must be called outside lock because it uses features (eg.
+ // InitialSyncer::startup() must be called outside lock because it uses features (eg.
// setting the initial sync flag) which depend on the ReplicationCoordinatorImpl.
- uassertStatusOK(drCopy->startup(opCtx, numInitialSyncAttempts.load()));
+ uassertStatusOK(initialSyncerCopy->startup(opCtx, numInitialSyncAttempts.load()));
} catch (...) {
auto status = exceptionToStatus();
log() << "Initial Sync failed to start: " << status;
@@ -725,7 +725,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
log() << "shutting down replication subsystems";
// Used to shut down outside of the lock.
- std::shared_ptr<DataReplicator> drCopy;
+ std::shared_ptr<InitialSyncer> initialSyncerCopy;
{
stdx::unique_lock<stdx::mutex> lk(_mutex);
fassert(28533, !_inShutdown);
@@ -746,7 +746,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
_replicationWaiterList.signalAndRemoveAll_inlock();
_opTimeWaiterList.signalAndRemoveAll_inlock();
_currentCommittedSnapshotCond.notify_all();
- _dr.swap(drCopy);
+ _initialSyncer.swap(initialSyncerCopy);
}
{
@@ -755,14 +755,14 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
}
// joining the replication executor is blocking so it must be run outside of the mutex
- if (drCopy) {
- LOG(1) << "ReplicationCoordinatorImpl::shutdown calling DataReplicator::shutdown.";
- const auto status = drCopy->shutdown();
+ if (initialSyncerCopy) {
+ LOG(1) << "ReplicationCoordinatorImpl::shutdown calling InitialSyncer::shutdown.";
+ const auto status = initialSyncerCopy->shutdown();
if (!status.isOK()) {
- warning() << "DataReplicator shutdown failed: " << status;
+ warning() << "InitialSyncer shutdown failed: " << status;
}
- drCopy->join();
- drCopy.reset();
+ initialSyncerCopy->join();
+ initialSyncerCopy.reset();
}
_externalState->shutdown(opCtx);
_replExecutor.shutdown();
@@ -2140,8 +2140,8 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus(
BSONObj initialSyncProgress;
if (responseStyle == ReplSetGetStatusResponseStyle::kInitialSync) {
LockGuard lk(_mutex);
- if (_dr) {
- initialSyncProgress = _dr->getInitialSyncProgress();
+ if (_initialSyncer) {
+ initialSyncProgress = _initialSyncer->getInitialSyncProgress();
}
}
@@ -2307,7 +2307,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt
auto opTime = _getMyLastAppliedOpTime_inlock();
_topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result);
// If we are in the middle of an initial sync, do a resync.
- doResync = result.isOK() && _dr && _dr->isActive();
+ doResync = result.isOK() && _initialSyncer && _initialSyncer->isActive();
}
if (doResync) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 5003435147e..2b48b993922 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -35,7 +35,7 @@
#include "mongo/base/status.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/repl/data_replicator.h"
+#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/old_update_position_args.h"
#include "mongo/db/repl/optime.h"
@@ -1361,10 +1361,11 @@ private:
// _lastCommittedOpTime cannot be set to an earlier OpTime.
OpTime _firstOpTimeOfMyTerm; // (M)
- // Storage interface used by data replicator.
+ // Storage interface used by initial syncer.
StorageInterface* _storage; // (PS)
- // Data Replicator used to replicate data
- std::shared_ptr<DataReplicator> _dr; // (I) pointer set under mutex, copied by callers.
+ // InitialSyncer used for initial sync.
+ std::shared_ptr<InitialSyncer>
+ _initialSyncer; // (I) pointer set under mutex, copied by callers.
// Hands out the next snapshot name.
AtomicUInt64 _snapshotNameGenerator; // (S)
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 24463e23091..32d03e31902 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -48,8 +48,8 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/repl/bgsync.h"
-#include "mongo/db/repl/data_replicator.h"
#include "mongo/db/repl/initial_sync.h"
+#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_client_info.h"
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 06c86f5823f..c42b51905b2 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -56,7 +56,7 @@
#include "mongo/db/prefetch.h"
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/repl/bgsync.h"
-#include "mongo/db/repl/data_replicator.h"
+#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"