summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-05-20 11:45:27 -0400
committerBenety Goh <benety@mongodb.com>2016-05-24 18:56:07 -0400
commit974ed0ec1799916af2ae12da1d17ac5fc920d966 (patch)
treeb60912c976fc1b9bb2b845b48964cfc2c5d485dd /src/mongo/db
parent8644fe41cc27f1052d602ce5980a95eb551833a9 (diff)
downloadmongo-974ed0ec1799916af2ae12da1d17ac5fc920d966.tar.gz
SERVER-23308 integrated repl::multiApply into data replicator
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/bgsync.cpp33
-rw-r--r--src/mongo/db/repl/bgsync.h19
-rw-r--r--src/mongo/db/repl/data_replicator.cpp25
-rw-r--r--src/mongo/db/repl/data_replicator.h2
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h34
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp28
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h19
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp17
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h15
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp24
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h21
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp29
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/sync_tail.h2
18 files changed, 247 insertions, 51 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 5544f897842..d83a3609938 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -881,7 +881,9 @@ env.Library(
'data_replicator_external_state_mock.cpp',
],
LIBDEPS=[
+ 'oplog_entry',
'optime',
+ '$BUILD_DIR/mongo/util/net/hostandport',
],
)
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index dfe7b287654..71c8d28dbcd 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -80,8 +80,10 @@ const Milliseconds kOplogSocketTimeout(30000);
*/
class DataReplicatorExternalStateBackgroundSync : public DataReplicatorExternalStateImpl {
public:
- DataReplicatorExternalStateBackgroundSync(ReplicationCoordinator* replicationCoordinator,
- BackgroundSync* bgsync);
+ DataReplicatorExternalStateBackgroundSync(
+ ReplicationCoordinator* replicationCoordinator,
+ ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
+ BackgroundSync* bgsync);
bool shouldStopFetching(const HostAndPort& source,
const OpTime& sourceOpTime,
bool sourceHasSyncSource) override;
@@ -91,8 +93,11 @@ private:
};
DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackgroundSync(
- ReplicationCoordinator* replicationCoordinator, BackgroundSync* bgsync)
- : DataReplicatorExternalStateImpl(replicationCoordinator), _bgsync(bgsync) {}
+ ReplicationCoordinator* replicationCoordinator,
+ ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
+ BackgroundSync* bgsync)
+ : DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState),
+ _bgsync(bgsync) {}
bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(const HostAndPort& source,
const OpTime& sourceOpTime,
@@ -157,8 +162,6 @@ BackgroundSync::BackgroundSync()
_threadPoolTaskExecutor(makeThreadPool(),
executor::makeNetworkInterface("NetworkInterfaceASIO-BGSync")),
_replCoord(getGlobalReplicationCoordinator()),
- _dataReplicatorExternalState(
- stdx::make_unique<DataReplicatorExternalStateBackgroundSync>(_replCoord, this)),
_syncSourceResolver(_replCoord),
_lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0),
std::numeric_limits<long long>::max()) {}
@@ -184,7 +187,8 @@ void BackgroundSync::shutdown() {
}
}
-void BackgroundSync::producerThread() {
+void BackgroundSync::producerThread(
+ ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) {
Client::initThread("rsBackgroundSync");
AuthorizationSession::get(cc())->grantInternalAuthorization();
@@ -196,7 +200,7 @@ void BackgroundSync::producerThread() {
while (!inShutdown()) {
try {
- _producerThread();
+ _producerThread(replicationCoordinatorExternalState);
} catch (const DBException& e) {
std::string msg(str::stream() << "sync producer problem: " << e.toString());
error() << msg;
@@ -222,7 +226,8 @@ void BackgroundSync::_signalNoNewDataForApplier() {
}
}
-void BackgroundSync::_producerThread() {
+void BackgroundSync::_producerThread(
+ ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) {
const MemberState state = _replCoord->getMemberState();
// Stop when the state changes to primary.
if (_replCoord->isWaitingForApplierToDrain() || state.primary()) {
@@ -256,10 +261,12 @@ void BackgroundSync::_producerThread() {
start(&txn);
}
- _produce(&txn);
+ _produce(&txn, replicationCoordinatorExternalState);
}
-void BackgroundSync::_produce(OperationContext* txn) {
+void BackgroundSync::_produce(
+ OperationContext* txn,
+ ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) {
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
{
@@ -339,6 +346,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
// "lastFetched" not used. Already set in _enqueueDocuments.
Status fetcherReturnStatus = Status::OK();
+ DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
+ _replCoord, replicationCoordinatorExternalState, this);
OplogFetcher* oplogFetcher;
try {
auto config = _replCoord->getConfig();
@@ -354,7 +363,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
source,
NamespaceString(rsOplogName),
config,
- _dataReplicatorExternalState.get(),
+ &dataReplicatorExternalState,
stdx::bind(&BackgroundSync::_enqueueDocuments,
this,
stdx::placeholders::_1,
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index f9291c91b44..40d44ae8f93 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -52,6 +52,7 @@ namespace repl {
class Member;
class ReplicationCoordinator;
+class ReplicationCoordinatorExternalState;
// This interface exists to facilitate easier testing;
// the test infrastructure implements these functions with stubs.
@@ -102,8 +103,13 @@ public:
virtual ~BackgroundSync() {}
- // starts the producer thread
- void producerThread();
+ /**
+ * Starts the producer thread which runs until shutdown. Upon resolving the current sync source
+ * the producer thread uses the OplogFetcher (which requires the replication coordinator
+ * external state at construction) to fetch oplog entries from the source's oplog via a long
+ * running find query.
+ */
+ void producerThread(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState);
// starts the sync target notifying thread
void notifierThread();
@@ -151,8 +157,9 @@ private:
BackgroundSync operator=(const BackgroundSync& s);
// Production thread
- void _producerThread();
- void _produce(OperationContext* txn);
+ void _producerThread(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState);
+ void _produce(OperationContext* txn,
+ ReplicationCoordinatorExternalState* replicationCoordinatorExternalState);
/**
* Signals to the applier that we have no new data,
@@ -211,10 +218,6 @@ private:
// A pointer to the replication coordinator running the show.
ReplicationCoordinator* _replCoord;
- // Data replicator external state required by the oplog fetcher.
- // Owned by us.
- std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState;
-
// Used to determine sync source.
// TODO(dannenberg) move into DataReplicator.
SyncSourceResolver _syncSourceResolver;
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index c72427f3d40..41e15b19daf 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -428,7 +428,6 @@ DataReplicator::DataReplicator(
_applierActive(false),
_applierPaused(false),
_oplogBuffer(kOplogBufferSize, &getSize) {
- uassert(ErrorCodes::BadValue, "invalid applier function", _opts.applierFn);
uassert(ErrorCodes::BadValue, "invalid rollback function", _opts.rollbackFn);
uassert(ErrorCodes::BadValue,
"invalid replSetUpdatePosition command object creation function",
@@ -1147,7 +1146,6 @@ Status DataReplicator::_scheduleApplyBatch_inlock() {
return status.getStatus();
}
}
- invariant(_opts.applierFn);
invariant(!(_applier && _applier->isActive()));
return _scheduleApplyBatch_inlock(ops);
}
@@ -1155,6 +1153,27 @@ Status DataReplicator::_scheduleApplyBatch_inlock() {
}
Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) {
+ MultiApplier::ApplyOperationFn applierFn;
+ if (_state == DataReplicatorState::Steady) {
+ applierFn = stdx::bind(&DataReplicatorExternalState::_multiSyncApply,
+ _dataReplicatorExternalState.get(),
+ stdx::placeholders::_1);
+ } else {
+ invariant(_state == DataReplicatorState::InitialSync);
+ // "_syncSource" has to be copied to stdx::bind result.
+ HostAndPort source = _syncSource;
+ applierFn = stdx::bind(&DataReplicatorExternalState::_multiInitialSyncApply,
+ _dataReplicatorExternalState.get(),
+ stdx::placeholders::_1,
+ source);
+ }
+
+ auto multiApplyFn = stdx::bind(&DataReplicatorExternalState::_multiApply,
+ _dataReplicatorExternalState.get(),
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3);
+
auto lambda = [this](const TimestampStatus& ts, const Operations& theOps) {
CBHStatus status = _exec->scheduleWork(stdx::bind(&DataReplicator::_onApplyBatchFinish,
this,
@@ -1172,7 +1191,7 @@ Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) {
_exec->wait(status.getValue());
};
- _applier.reset(new MultiApplier(_exec, ops, _opts.applierFn, _opts.multiApplyFn, lambda));
+ _applier.reset(new MultiApplier(_exec, ops, applierFn, multiApplyFn, lambda));
return _applier->start();
}
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index 441cbc10aa3..e1c491ebd4d 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -136,8 +136,6 @@ struct DataReplicatorOptions {
std::string scopeNS;
BSONObj filterCriteria;
- MultiApplier::ApplyOperationFn applierFn;
- MultiApplier::MultiApplyFn multiApplyFn;
RollbackFn rollbackFn;
Reporter::PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn;
GetMyLastOptimeFn getMyLastOptime;
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index 983290a2148..d19f46f9711 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/optime_with.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
@@ -38,6 +39,8 @@
namespace mongo {
namespace repl {
+class DataReplicator;
+
/**
* Holds current term and last committed optime necessary to populate find/getMore command requests.
*/
@@ -76,6 +79,37 @@ public:
virtual bool shouldStopFetching(const HostAndPort& source,
const OpTime& sourceOpTime,
bool sourceHasSyncSource) = 0;
+
+private:
+ /**
+ * Applies the operations described in the oplog entries contained in "ops" using the
+ * "applyOperation" function.
+ *
+ * Used exclusively by the DataReplicator to construct a MultiApplier.
+ */
+ virtual StatusWith<OpTime> _multiApply(OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) = 0;
+
+ /**
+ * Used by _multiApply() to write operations to database during steady state replication.
+ *
+ * Used exclusively by the DataReplicator to construct a MultiApplier.
+ */
+ virtual void _multiSyncApply(const MultiApplier::Operations& ops) = 0;
+
+ /**
+ * Used by _multiApply() to write operations to database during initial sync.
+ * Fetches missing documents from "source".
+ *
+ * Used exclusively by the DataReplicator to construct a MultiApplier.
+ */
+ virtual void _multiInitialSyncApply(const MultiApplier::Operations& ops,
+ const HostAndPort& source) = 0;
+
+ // Provides DataReplicator with access to _multiApply, _multiSyncApply and
+ // _multiInitialSyncApply.
+ friend class DataReplicator;
};
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 828a2aa51a5..558f3faf700 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -33,14 +33,17 @@
#include "mongo/db/repl/data_replicator_external_state_impl.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/replication_coordinator_external_state.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
DataReplicatorExternalStateImpl::DataReplicatorExternalStateImpl(
- ReplicationCoordinator* replicationCoordinator)
- : _replicationCoordinator(replicationCoordinator) {}
+ ReplicationCoordinator* replicationCoordinator,
+ ReplicationCoordinatorExternalState* replicationCoordinatorExternalState)
+ : _replicationCoordinator(replicationCoordinator),
+ _replicationCoordinatorExternalState(replicationCoordinatorExternalState) {}
OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOpTime() {
if (!_replicationCoordinator->isV1ElectionProtocol()) {
@@ -70,9 +73,30 @@ bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& sour
return false;
}
+StatusWith<OpTime> DataReplicatorExternalStateImpl::_multiApply(
+ OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) {
+ return _replicationCoordinatorExternalState->multiApply(txn, ops, applyOperation);
+}
+
+void DataReplicatorExternalStateImpl::_multiSyncApply(const MultiApplier::Operations& ops) {
+ _replicationCoordinatorExternalState->multiSyncApply(ops);
+}
+
+void DataReplicatorExternalStateImpl::_multiInitialSyncApply(const MultiApplier::Operations& ops,
+ const HostAndPort& source) {
+ _replicationCoordinatorExternalState->multiInitialSyncApply(ops, source);
+}
+
ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const {
return _replicationCoordinator;
}
+ReplicationCoordinatorExternalState*
+DataReplicatorExternalStateImpl::getReplicationCoordinatorExternalState() const {
+ return _replicationCoordinatorExternalState;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index 8fc84ff218c..25a09e1d7db 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -34,6 +34,7 @@ namespace mongo {
namespace repl {
class ReplicationCoordinator;
+class ReplicationCoordinatorExternalState;
/**
* Data replicator external state implementation using a replication coordinator.
@@ -41,7 +42,9 @@ class ReplicationCoordinator;
class DataReplicatorExternalStateImpl : public DataReplicatorExternalState {
public:
- DataReplicatorExternalStateImpl(ReplicationCoordinator* replicationCoordinator);
+ DataReplicatorExternalStateImpl(
+ ReplicationCoordinator* replicationCoordinator,
+ ReplicationCoordinatorExternalState* replicationCoordinatorExternalState);
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
@@ -51,12 +54,26 @@ public:
const OpTime& sourceOpTime,
bool sourceHasSyncSource) override;
+private:
+ StatusWith<OpTime> _multiApply(OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) override;
+
+ void _multiSyncApply(const MultiApplier::Operations& ops) override;
+
+ void _multiInitialSyncApply(const MultiApplier::Operations& ops,
+ const HostAndPort& source) override;
+
protected:
ReplicationCoordinator* getReplicationCoordinator() const;
+ ReplicationCoordinatorExternalState* getReplicationCoordinatorExternalState() const;
private:
// Not owned by us.
ReplicationCoordinator* _replicationCoordinator;
+
+ // Not owned by us.
+ ReplicationCoordinatorExternalState* _replicationCoordinatorExternalState;
};
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index 501d9ae70c3..83a6e3fd83c 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -33,6 +33,11 @@
namespace mongo {
namespace repl {
+DataReplicatorExternalStateMock::DataReplicatorExternalStateMock()
+ : multiApplyFn([](OperationContext*,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn) { return ops.back().getOpTime(); }) {}
+
OpTimeWithTerm DataReplicatorExternalStateMock::getCurrentTermAndLastCommittedOpTime() {
return {currentTerm, lastCommittedOpTime};
}
@@ -50,5 +55,17 @@ bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& sour
return shouldStopFetchingResult;
}
+StatusWith<OpTime> DataReplicatorExternalStateMock::_multiApply(
+ OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) {
+ return multiApplyFn(txn, ops, applyOperation);
+}
+
+void DataReplicatorExternalStateMock::_multiSyncApply(const MultiApplier::Operations& ops) {}
+
+void DataReplicatorExternalStateMock::_multiInitialSyncApply(const MultiApplier::Operations& ops,
+ const HostAndPort& source) {}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index ef78c691157..4705fb57bd3 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -41,6 +41,8 @@ class ReplicationCoordinator;
class DataReplicatorExternalStateMock : public DataReplicatorExternalState {
public:
+ DataReplicatorExternalStateMock();
+
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
void processMetadata(const rpc::ReplSetMetadata& metadata) override;
@@ -63,6 +65,19 @@ public:
// Returned by shouldStopFetching.
bool shouldStopFetchingResult = false;
+
+ // Override to change multiApply behavior.
+ MultiApplier::MultiApplyFn multiApplyFn;
+
+private:
+ StatusWith<OpTime> _multiApply(OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) override;
+
+ void _multiSyncApply(const MultiApplier::Operations& ops) override;
+
+ void _multiInitialSyncApply(const MultiApplier::Operations& ops,
+ const HostAndPort& source) override;
};
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 8e6352fe218..939989d8aff 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -106,11 +106,6 @@ public:
* clear/reset state
*/
void reset() {
- _applierFn = [](const MultiApplier::Operations&) {};
- _multiApplyFn = [](OperationContext*,
- const MultiApplier::Operations& ops,
- MultiApplier::ApplyOperationFn)
- -> StatusWith<OpTime> { return ops.back().getOpTime(); };
_rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&)
-> Status { return Status::OK(); };
_setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; };
@@ -181,6 +176,10 @@ public:
return *_dr;
}
+ DataReplicatorExternalStateMock* getExternalState() {
+ return _externalState;
+ }
+
protected:
void setUp() override {
ReplicationExecutorTest::setUp();
@@ -194,11 +193,6 @@ protected:
DataReplicatorOptions options;
options.initialSyncRetryWait = Milliseconds(0);
- options.applierFn = [this](const MultiApplier::Operations& ops) { return _applierFn(ops); };
- options.multiApplyFn =
- [this](OperationContext* txn,
- const MultiApplier::Operations& ops,
- MultiApplier::ApplyOperationFn func) { return _multiApplyFn(txn, ops, func); };
options.rollbackFn = [this](OperationContext* txn,
const OpTime& lastOpTimeWritten,
const HostAndPort& syncSource) -> Status {
@@ -231,6 +225,7 @@ protected:
auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
dataReplicatorExternalState->currentTerm = 1LL;
dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime;
+ _externalState = dataReplicatorExternalState.get();
try {
_dr.reset(new DataReplicator(
@@ -246,8 +241,6 @@ protected:
// Executor may still invoke callback before shutting down.
}
- MultiApplier::ApplyOperationFn _applierFn;
- MultiApplier::MultiApplyFn _multiApplyFn;
DataReplicatorOptions::RollbackFn _rollbackFn;
DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime;
OpTime _myLastOpTime;
@@ -255,6 +248,7 @@ protected:
std::unique_ptr<SyncSourceSelector> _syncSourceSelector;
private:
+ DataReplicatorExternalStateMock* _externalState;
std::unique_ptr<DataReplicator> _dr;
};
@@ -521,7 +515,7 @@ TEST_F(InitialSyncTest, Complete) {
TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) {
DataReplicatorOptions opts;
int applyCounter{0};
- _multiApplyFn =
+ getExternalState()->multiApplyFn =
[&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
-> StatusWith<OpTime> {
if (++applyCounter == 1) {
@@ -944,7 +938,7 @@ TEST_F(SteadyStateTest, PauseDataReplicator) {
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- _multiApplyFn =
+ getExternalState()->multiApplyFn =
[&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
-> StatusWith<OpTime> {
stdx::lock_guard<stdx::mutex> lock(mutex);
@@ -1036,7 +1030,7 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- _multiApplyFn =
+ getExternalState()->multiApplyFn =
[&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
-> StatusWith<OpTime> {
stdx::lock_guard<stdx::mutex> lock(mutex);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 7be2d475eda..4a58018474a 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -33,6 +33,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/repl/member_state.h"
+#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/optime.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/time_support.h"
@@ -268,6 +269,26 @@ public:
* Returns true if the current storage engine supports read committed.
*/
virtual bool isReadCommittedSupportedByStorageEngine(OperationContext* txn) const = 0;
+
+ /**
+ * Applies the operations described in the oplog entries contained in "ops" using the
+ * "applyOperation" function.
+ */
+ virtual StatusWith<OpTime> multiApply(OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) = 0;
+
+ /**
+ * Used by multiApply() to writes operations to database during steady state replication.
+ */
+ virtual void multiSyncApply(const MultiApplier::Operations& ops) = 0;
+
+ /**
+ * Used by multiApply() to writes operations to database during initial sync.
+ * Fetches missing documents from "source".
+ */
+ virtual void multiInitialSyncApply(const MultiApplier::Operations& ops,
+ const HostAndPort& source) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 152016b2ddc..e5042d8f24b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/repl/rs_initialsync.h"
#include "mongo/db/repl/snapshot_thread.h"
#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/s/sharding_state.h"
@@ -107,7 +108,7 @@ void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFini
invariant(!(bgsync == nullptr && !inShutdownStrict())); // bgsync can be null @shutdown.
invariant(!_producerThread); // The producer thread should not be init'd before this.
_producerThread.reset(
- new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync)));
+ new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync, this)));
// Do initial sync.
syncDoInitialSync();
finished();
@@ -119,7 +120,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication() {
log() << "Starting replication fetcher thread";
BackgroundSync* bgsync = BackgroundSync::get();
_producerThread.reset(
- new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync)));
+ new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync, this)));
}
log() << "Starting replication applier threads";
invariant(!_applierThread);
@@ -501,6 +502,30 @@ bool ReplicationCoordinatorExternalStateImpl::isReadCommittedSupportedByStorageE
return storageEngine->getSnapshotManager();
}
+StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply(
+ OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) {
+ return repl::multiApply(txn, ops, applyOperation);
+}
+
+void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier::Operations& ops) {
+ // SyncTail* argument is not used by repl::multiSyncApply().
+ repl::multiSyncApply(ops, nullptr);
+}
+
+void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply(
+ const MultiApplier::Operations& ops, const HostAndPort& source) {
+ // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc())
+ // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail
+ // with invalid BackgroundSync and MultiSyncApplyFunc arguments because we will not be accessing
+ // any SyncTail functionality that require these constructor parameters.
+ SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc());
+ syncTail.setHostname(source.toString());
+ repl::multiInitialSyncApply(ops, &syncTail);
+}
+
+
JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken() {
return repl::getGlobalReplicationCoordinator()->getMyLastAppliedOpTime();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index e08cf1fb75e..e8b8aaa7207 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -87,6 +87,12 @@ public:
virtual void notifyOplogMetadataWaiters();
virtual double getElectionTimeoutOffsetLimitFraction() const;
virtual bool isReadCommittedSupportedByStorageEngine(OperationContext* txn) const;
+ virtual StatusWith<OpTime> multiApply(OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) override;
+ virtual void multiSyncApply(const MultiApplier::Operations& ops) override;
+ virtual void multiInitialSyncApply(const MultiApplier::Operations& ops,
+ const HostAndPort& source) override;
std::string getNextOpContextThreadName();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 0396bb37b5f..2d7736def87 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -236,6 +236,16 @@ bool ReplicationCoordinatorExternalStateMock::isReadCommittedSupportedByStorageE
return _isReadCommittedSupported;
}
+StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::multiApply(
+ OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
+void ReplicationCoordinatorExternalStateMock::multiSyncApply(const MultiApplier::Operations& ops) {}
+
+void ReplicationCoordinatorExternalStateMock::multiInitialSyncApply(
+ const MultiApplier::Operations& ops, const HostAndPort& source) {}
+
void ReplicationCoordinatorExternalStateMock::setIsReadCommittedEnabled(bool val) {
_isReadCommittedSupported = val;
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 77d883df480..049f39745c8 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -88,6 +88,12 @@ public:
virtual void notifyOplogMetadataWaiters();
virtual double getElectionTimeoutOffsetLimitFraction() const;
virtual bool isReadCommittedSupportedByStorageEngine(OperationContext* txn) const;
+ virtual StatusWith<OpTime> multiApply(OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) override;
+ virtual void multiSyncApply(const MultiApplier::Operations& ops) override;
+ virtual void multiInitialSyncApply(const MultiApplier::Operations& ops,
+ const HostAndPort& source) override;
/**
* Adds "host" to the list of hosts that this mock will match when responding to "isSelf"
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 14eba0564ed..3b5319b8cc7 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -203,10 +203,6 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings&
DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) {
DataReplicatorOptions options;
- options.applierFn = [](const MultiApplier::Operations&) {};
- options.multiApplyFn =
- [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn)
- -> OpTime { return OpTime(); };
options.rollbackFn =
[](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); };
options.prepareReplSetUpdatePositionCommandFn =
@@ -256,7 +252,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
_canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.isSlave())),
_canServeNonLocalReads(0U),
_dr(createDataReplicatorOptions(this),
- stdx::make_unique<DataReplicatorExternalStateImpl>(this),
+ stdx::make_unique<DataReplicatorExternalStateImpl>(this, externalState),
&_replExecutor),
_isDurableStorageEngine(isDurableStorageEngineFn ? *isDurableStorageEngineFn : []() -> bool {
return getGlobalServiceContext()->getGlobalStorageEngine()->isDurable();
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index d2758fc478c..21459b723a6 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -180,7 +180,7 @@ private:
};
/**
- * Applies the opeartions described in the oplog entries contained in "ops" using the
+ * Applies the operations described in the oplog entries contained in "ops" using the
* "applyOperation" function.
*
* Returns ErrorCode::InterruptedAtShutdown if the node enters shutdown while applying ops,