From 974ed0ec1799916af2ae12da1d17ac5fc920d966 Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Fri, 20 May 2016 11:45:27 -0400 Subject: SERVER-23308 integrated repl::multiApply into data replicator --- src/mongo/db/repl/SConscript | 2 ++ src/mongo/db/repl/bgsync.cpp | 33 +++++++++++++-------- src/mongo/db/repl/bgsync.h | 19 +++++++----- src/mongo/db/repl/data_replicator.cpp | 25 ++++++++++++++-- src/mongo/db/repl/data_replicator.h | 2 -- src/mongo/db/repl/data_replicator_external_state.h | 34 ++++++++++++++++++++++ .../repl/data_replicator_external_state_impl.cpp | 28 ++++++++++++++++-- .../db/repl/data_replicator_external_state_impl.h | 19 +++++++++++- .../repl/data_replicator_external_state_mock.cpp | 17 +++++++++++ .../db/repl/data_replicator_external_state_mock.h | 15 ++++++++++ src/mongo/db/repl/data_replicator_test.cpp | 24 ++++++--------- .../repl/replication_coordinator_external_state.h | 21 +++++++++++++ ...replication_coordinator_external_state_impl.cpp | 29 ++++++++++++++++-- .../replication_coordinator_external_state_impl.h | 6 ++++ ...replication_coordinator_external_state_mock.cpp | 10 +++++++ .../replication_coordinator_external_state_mock.h | 6 ++++ src/mongo/db/repl/replication_coordinator_impl.cpp | 6 +--- src/mongo/db/repl/sync_tail.h | 2 +- 18 files changed, 247 insertions(+), 51 deletions(-) (limited to 'src/mongo/db') diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 5544f897842..d83a3609938 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -881,7 +881,9 @@ env.Library( 'data_replicator_external_state_mock.cpp', ], LIBDEPS=[ + 'oplog_entry', 'optime', + '$BUILD_DIR/mongo/util/net/hostandport', ], ) diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index dfe7b287654..71c8d28dbcd 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -80,8 +80,10 @@ const Milliseconds kOplogSocketTimeout(30000); */ class DataReplicatorExternalStateBackgroundSync : public DataReplicatorExternalStateImpl { public: - DataReplicatorExternalStateBackgroundSync(ReplicationCoordinator* replicationCoordinator, - BackgroundSync* bgsync); + DataReplicatorExternalStateBackgroundSync( + ReplicationCoordinator* replicationCoordinator, + ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, + BackgroundSync* bgsync); bool shouldStopFetching(const HostAndPort& source, const OpTime& sourceOpTime, bool sourceHasSyncSource) override; @@ -91,8 +93,11 @@ private: }; DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackgroundSync( - ReplicationCoordinator* replicationCoordinator, BackgroundSync* bgsync) - : DataReplicatorExternalStateImpl(replicationCoordinator), _bgsync(bgsync) {} + ReplicationCoordinator* replicationCoordinator, + ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, + BackgroundSync* bgsync) + : DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState), + _bgsync(bgsync) {} bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(const HostAndPort& source, const OpTime& sourceOpTime, @@ -157,8 +162,6 @@ BackgroundSync::BackgroundSync() _threadPoolTaskExecutor(makeThreadPool(), executor::makeNetworkInterface("NetworkInterfaceASIO-BGSync")), _replCoord(getGlobalReplicationCoordinator()), - _dataReplicatorExternalState( - stdx::make_unique(_replCoord, this)), _syncSourceResolver(_replCoord), _lastOpTimeFetched(Timestamp(std::numeric_limits::max(), 0), std::numeric_limits::max()) {} @@ -184,7 +187,8 @@ void BackgroundSync::shutdown() { } } -void BackgroundSync::producerThread() { +void BackgroundSync::producerThread( + ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) { Client::initThread("rsBackgroundSync"); AuthorizationSession::get(cc())->grantInternalAuthorization(); @@ -196,7 +200,7 @@ void BackgroundSync::producerThread() { while (!inShutdown()) { try { - _producerThread(); + _producerThread(replicationCoordinatorExternalState); } catch (const DBException& e) { std::string msg(str::stream() << "sync producer problem: " << e.toString()); error() << msg; @@ -222,7 +226,8 @@ void BackgroundSync::_signalNoNewDataForApplier() { } } -void BackgroundSync::_producerThread() { +void BackgroundSync::_producerThread( + ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) { const MemberState state = _replCoord->getMemberState(); // Stop when the state changes to primary. if (_replCoord->isWaitingForApplierToDrain() || state.primary()) { @@ -256,10 +261,12 @@ void BackgroundSync::_producerThread() { start(&txn); } - _produce(&txn); + _produce(&txn, replicationCoordinatorExternalState); } -void BackgroundSync::_produce(OperationContext* txn) { +void BackgroundSync::_produce( + OperationContext* txn, + ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) { // this oplog reader does not do a handshake because we don't want the server it's syncing // from to track how far it has synced { @@ -339,6 +346,8 @@ void BackgroundSync::_produce(OperationContext* txn) { // "lastFetched" not used. Already set in _enqueueDocuments. Status fetcherReturnStatus = Status::OK(); + DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState( + _replCoord, replicationCoordinatorExternalState, this); OplogFetcher* oplogFetcher; try { auto config = _replCoord->getConfig(); @@ -354,7 +363,7 @@ void BackgroundSync::_produce(OperationContext* txn) { source, NamespaceString(rsOplogName), config, - _dataReplicatorExternalState.get(), + &dataReplicatorExternalState, stdx::bind(&BackgroundSync::_enqueueDocuments, this, stdx::placeholders::_1, diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index f9291c91b44..40d44ae8f93 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -52,6 +52,7 @@ namespace repl { class Member; class ReplicationCoordinator; +class ReplicationCoordinatorExternalState; // This interface exists to facilitate easier testing; // the test infrastructure implements these functions with stubs. @@ -102,8 +103,13 @@ public: virtual ~BackgroundSync() {} - // starts the producer thread - void producerThread(); + /** + * Starts the producer thread which runs until shutdown. Upon resolving the current sync source + * the producer thread uses the OplogFetcher (which requires the replication coordinator + * external state at construction) to fetch oplog entries from the source's oplog via a long + * running find query. + */ + void producerThread(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState); // starts the sync target notifying thread void notifierThread(); @@ -151,8 +157,9 @@ private: BackgroundSync operator=(const BackgroundSync& s); // Production thread - void _producerThread(); - void _produce(OperationContext* txn); + void _producerThread(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState); + void _produce(OperationContext* txn, + ReplicationCoordinatorExternalState* replicationCoordinatorExternalState); /** * Signals to the applier that we have no new data, @@ -211,10 +218,6 @@ private: // A pointer to the replication coordinator running the show. ReplicationCoordinator* _replCoord; - // Data replicator external state required by the oplog fetcher. - // Owned by us. - std::unique_ptr _dataReplicatorExternalState; - // Used to determine sync source. // TODO(dannenberg) move into DataReplicator. SyncSourceResolver _syncSourceResolver; diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index c72427f3d40..41e15b19daf 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -428,7 +428,6 @@ DataReplicator::DataReplicator( _applierActive(false), _applierPaused(false), _oplogBuffer(kOplogBufferSize, &getSize) { - uassert(ErrorCodes::BadValue, "invalid applier function", _opts.applierFn); uassert(ErrorCodes::BadValue, "invalid rollback function", _opts.rollbackFn); uassert(ErrorCodes::BadValue, "invalid replSetUpdatePosition command object creation function", @@ -1147,7 +1146,6 @@ Status DataReplicator::_scheduleApplyBatch_inlock() { return status.getStatus(); } } - invariant(_opts.applierFn); invariant(!(_applier && _applier->isActive())); return _scheduleApplyBatch_inlock(ops); } @@ -1155,6 +1153,27 @@ Status DataReplicator::_scheduleApplyBatch_inlock() { } Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) { + MultiApplier::ApplyOperationFn applierFn; + if (_state == DataReplicatorState::Steady) { + applierFn = stdx::bind(&DataReplicatorExternalState::_multiSyncApply, + _dataReplicatorExternalState.get(), + stdx::placeholders::_1); + } else { + invariant(_state == DataReplicatorState::InitialSync); + // "_syncSource" has to be copied to stdx::bind result. + HostAndPort source = _syncSource; + applierFn = stdx::bind(&DataReplicatorExternalState::_multiInitialSyncApply, + _dataReplicatorExternalState.get(), + stdx::placeholders::_1, + source); + } + + auto multiApplyFn = stdx::bind(&DataReplicatorExternalState::_multiApply, + _dataReplicatorExternalState.get(), + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3); + auto lambda = [this](const TimestampStatus& ts, const Operations& theOps) { CBHStatus status = _exec->scheduleWork(stdx::bind(&DataReplicator::_onApplyBatchFinish, this, @@ -1172,7 +1191,7 @@ Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) { _exec->wait(status.getValue()); }; - _applier.reset(new MultiApplier(_exec, ops, _opts.applierFn, _opts.multiApplyFn, lambda)); + _applier.reset(new MultiApplier(_exec, ops, applierFn, multiApplyFn, lambda)); return _applier->start(); } diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index 441cbc10aa3..e1c491ebd4d 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -136,8 +136,6 @@ struct DataReplicatorOptions { std::string scopeNS; BSONObj filterCriteria; - MultiApplier::ApplyOperationFn applierFn; - MultiApplier::MultiApplyFn multiApplyFn; RollbackFn rollbackFn; Reporter::PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn; GetMyLastOptimeFn getMyLastOptime; diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index 983290a2148..d19f46f9711 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/optime_with.h" #include "mongo/rpc/metadata/repl_set_metadata.h" @@ -38,6 +39,8 @@ namespace mongo { namespace repl { +class DataReplicator; + /** * Holds current term and last committed optime necessary to populate find/getMore command requests. */ @@ -76,6 +79,37 @@ public: virtual bool shouldStopFetching(const HostAndPort& source, const OpTime& sourceOpTime, bool sourceHasSyncSource) = 0; + +private: + /** + * Applies the operations described in the oplog entries contained in "ops" using the + * "applyOperation" function. + * + * Used exclusively by the DataReplicator to construct a MultiApplier. + */ + virtual StatusWith _multiApply(OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) = 0; + + /** + * Used by _multiApply() to write operations to database during steady state replication. + * + * Used exclusively by the DataReplicator to construct a MultiApplier. + */ + virtual void _multiSyncApply(const MultiApplier::Operations& ops) = 0; + + /** + * Used by _multiApply() to write operations to database during initial sync. + * Fetches missing documents from "source". + * + * Used exclusively by the DataReplicator to construct a MultiApplier. + */ + virtual void _multiInitialSyncApply(const MultiApplier::Operations& ops, + const HostAndPort& source) = 0; + + // Provides DataReplicator with access to _multiApply, _multiSyncApply and + // _multiInitialSyncApply. + friend class DataReplicator; }; } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 828a2aa51a5..558f3faf700 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -33,14 +33,17 @@ #include "mongo/db/repl/data_replicator_external_state_impl.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/util/log.h" namespace mongo { namespace repl { DataReplicatorExternalStateImpl::DataReplicatorExternalStateImpl( - ReplicationCoordinator* replicationCoordinator) - : _replicationCoordinator(replicationCoordinator) {} + ReplicationCoordinator* replicationCoordinator, + ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) + : _replicationCoordinator(replicationCoordinator), + _replicationCoordinatorExternalState(replicationCoordinatorExternalState) {} OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOpTime() { if (!_replicationCoordinator->isV1ElectionProtocol()) { @@ -70,9 +73,30 @@ bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& sour return false; } +StatusWith DataReplicatorExternalStateImpl::_multiApply( + OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) { + return _replicationCoordinatorExternalState->multiApply(txn, ops, applyOperation); +} + +void DataReplicatorExternalStateImpl::_multiSyncApply(const MultiApplier::Operations& ops) { + _replicationCoordinatorExternalState->multiSyncApply(ops); +} + +void DataReplicatorExternalStateImpl::_multiInitialSyncApply(const MultiApplier::Operations& ops, + const HostAndPort& source) { + _replicationCoordinatorExternalState->multiInitialSyncApply(ops, source); +} + ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const { return _replicationCoordinator; } +ReplicationCoordinatorExternalState* +DataReplicatorExternalStateImpl::getReplicationCoordinatorExternalState() const { + return _replicationCoordinatorExternalState; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 8fc84ff218c..25a09e1d7db 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -34,6 +34,7 @@ namespace mongo { namespace repl { class ReplicationCoordinator; +class ReplicationCoordinatorExternalState; /** * Data replicator external state implementation using a replication coordinator. @@ -41,7 +42,9 @@ class ReplicationCoordinator; class DataReplicatorExternalStateImpl : public DataReplicatorExternalState { public: - DataReplicatorExternalStateImpl(ReplicationCoordinator* replicationCoordinator); + DataReplicatorExternalStateImpl( + ReplicationCoordinator* replicationCoordinator, + ReplicationCoordinatorExternalState* replicationCoordinatorExternalState); OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; @@ -51,12 +54,26 @@ public: const OpTime& sourceOpTime, bool sourceHasSyncSource) override; +private: + StatusWith _multiApply(OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) override; + + void _multiSyncApply(const MultiApplier::Operations& ops) override; + + void _multiInitialSyncApply(const MultiApplier::Operations& ops, + const HostAndPort& source) override; + protected: ReplicationCoordinator* getReplicationCoordinator() const; + ReplicationCoordinatorExternalState* getReplicationCoordinatorExternalState() const; private: // Not owned by us. ReplicationCoordinator* _replicationCoordinator; + + // Not owned by us. + ReplicationCoordinatorExternalState* _replicationCoordinatorExternalState; }; diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 501d9ae70c3..83a6e3fd83c 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -33,6 +33,11 @@ namespace mongo { namespace repl { +DataReplicatorExternalStateMock::DataReplicatorExternalStateMock() + : multiApplyFn([](OperationContext*, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn) { return ops.back().getOpTime(); }) {} + OpTimeWithTerm DataReplicatorExternalStateMock::getCurrentTermAndLastCommittedOpTime() { return {currentTerm, lastCommittedOpTime}; } @@ -50,5 +55,17 @@ bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& sour return shouldStopFetchingResult; } +StatusWith DataReplicatorExternalStateMock::_multiApply( + OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) { + return multiApplyFn(txn, ops, applyOperation); +} + +void DataReplicatorExternalStateMock::_multiSyncApply(const MultiApplier::Operations& ops) {} + +void DataReplicatorExternalStateMock::_multiInitialSyncApply(const MultiApplier::Operations& ops, + const HostAndPort& source) {} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index ef78c691157..4705fb57bd3 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -41,6 +41,8 @@ class ReplicationCoordinator; class DataReplicatorExternalStateMock : public DataReplicatorExternalState { public: + DataReplicatorExternalStateMock(); + OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; void processMetadata(const rpc::ReplSetMetadata& metadata) override; @@ -63,6 +65,19 @@ public: // Returned by shouldStopFetching. bool shouldStopFetchingResult = false; + + // Override to change multiApply behavior. + MultiApplier::MultiApplyFn multiApplyFn; + +private: + StatusWith _multiApply(OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) override; + + void _multiSyncApply(const MultiApplier::Operations& ops) override; + + void _multiInitialSyncApply(const MultiApplier::Operations& ops, + const HostAndPort& source) override; }; diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 8e6352fe218..939989d8aff 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -106,11 +106,6 @@ public: * clear/reset state */ void reset() { - _applierFn = [](const MultiApplier::Operations&) {}; - _multiApplyFn = [](OperationContext*, - const MultiApplier::Operations& ops, - MultiApplier::ApplyOperationFn) - -> StatusWith { return ops.back().getOpTime(); }; _rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); }; _setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; }; @@ -181,6 +176,10 @@ public: return *_dr; } + DataReplicatorExternalStateMock* getExternalState() { + return _externalState; + } + protected: void setUp() override { ReplicationExecutorTest::setUp(); @@ -194,11 +193,6 @@ protected: DataReplicatorOptions options; options.initialSyncRetryWait = Milliseconds(0); - options.applierFn = [this](const MultiApplier::Operations& ops) { return _applierFn(ops); }; - options.multiApplyFn = - [this](OperationContext* txn, - const MultiApplier::Operations& ops, - MultiApplier::ApplyOperationFn func) { return _multiApplyFn(txn, ops, func); }; options.rollbackFn = [this](OperationContext* txn, const OpTime& lastOpTimeWritten, const HostAndPort& syncSource) -> Status { @@ -231,6 +225,7 @@ protected: auto dataReplicatorExternalState = stdx::make_unique(); dataReplicatorExternalState->currentTerm = 1LL; dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime; + _externalState = dataReplicatorExternalState.get(); try { _dr.reset(new DataReplicator( @@ -246,8 +241,6 @@ protected: // Executor may still invoke callback before shutting down. } - MultiApplier::ApplyOperationFn _applierFn; - MultiApplier::MultiApplyFn _multiApplyFn; DataReplicatorOptions::RollbackFn _rollbackFn; DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime; OpTime _myLastOpTime; @@ -255,6 +248,7 @@ protected: std::unique_ptr _syncSourceSelector; private: + DataReplicatorExternalStateMock* _externalState; std::unique_ptr _dr; }; @@ -521,7 +515,7 @@ TEST_F(InitialSyncTest, Complete) { TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) { DataReplicatorOptions opts; int applyCounter{0}; - _multiApplyFn = + getExternalState()->multiApplyFn = [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) -> StatusWith { if (++applyCounter == 1) { @@ -944,7 +938,7 @@ TEST_F(SteadyStateTest, PauseDataReplicator) { unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - _multiApplyFn = + getExternalState()->multiApplyFn = [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) -> StatusWith { stdx::lock_guard lock(mutex); @@ -1036,7 +1030,7 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - _multiApplyFn = + getExternalState()->multiApplyFn = [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) -> StatusWith { stdx::lock_guard lock(mutex); diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 7be2d475eda..4a58018474a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -33,6 +33,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/bson/timestamp.h" #include "mongo/db/repl/member_state.h" +#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/optime.h" #include "mongo/stdx/functional.h" #include "mongo/util/time_support.h" @@ -268,6 +269,26 @@ public: * Returns true if the current storage engine supports read committed. */ virtual bool isReadCommittedSupportedByStorageEngine(OperationContext* txn) const = 0; + + /** + * Applies the operations described in the oplog entries contained in "ops" using the + * "applyOperation" function. + */ + virtual StatusWith multiApply(OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) = 0; + + /** + * Used by multiApply() to writes operations to database during steady state replication. + */ + virtual void multiSyncApply(const MultiApplier::Operations& ops) = 0; + + /** + * Used by multiApply() to writes operations to database during initial sync. + * Fetches missing documents from "source". + */ + virtual void multiInitialSyncApply(const MultiApplier::Operations& ops, + const HostAndPort& source) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 152016b2ddc..e5042d8f24b 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -57,6 +57,7 @@ #include "mongo/db/repl/rs_initialsync.h" #include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/sync_tail.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/s/sharding_state.h" @@ -107,7 +108,7 @@ void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFini invariant(!(bgsync == nullptr && !inShutdownStrict())); // bgsync can be null @shutdown. invariant(!_producerThread); // The producer thread should not be init'd before this. _producerThread.reset( - new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); + new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync, this))); // Do initial sync. syncDoInitialSync(); finished(); @@ -119,7 +120,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication() { log() << "Starting replication fetcher thread"; BackgroundSync* bgsync = BackgroundSync::get(); _producerThread.reset( - new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); + new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync, this))); } log() << "Starting replication applier threads"; invariant(!_applierThread); @@ -501,6 +502,30 @@ bool ReplicationCoordinatorExternalStateImpl::isReadCommittedSupportedByStorageE return storageEngine->getSnapshotManager(); } +StatusWith ReplicationCoordinatorExternalStateImpl::multiApply( + OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) { + return repl::multiApply(txn, ops, applyOperation); +} + +void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier::Operations& ops) { + // SyncTail* argument is not used by repl::multiSyncApply(). + repl::multiSyncApply(ops, nullptr); +} + +void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( + const MultiApplier::Operations& ops, const HostAndPort& source) { + // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc()) + // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail + // with invalid BackgroundSync and MultiSyncApplyFunc arguments because we will not be accessing + // any SyncTail functionality that require these constructor parameters. + SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc()); + syncTail.setHostname(source.toString()); + repl::multiInitialSyncApply(ops, &syncTail); +} + + JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken() { return repl::getGlobalReplicationCoordinator()->getMyLastAppliedOpTime(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index e08cf1fb75e..e8b8aaa7207 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -87,6 +87,12 @@ public: virtual void notifyOplogMetadataWaiters(); virtual double getElectionTimeoutOffsetLimitFraction() const; virtual bool isReadCommittedSupportedByStorageEngine(OperationContext* txn) const; + virtual StatusWith multiApply(OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) override; + virtual void multiSyncApply(const MultiApplier::Operations& ops) override; + virtual void multiInitialSyncApply(const MultiApplier::Operations& ops, + const HostAndPort& source) override; std::string getNextOpContextThreadName(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 0396bb37b5f..2d7736def87 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -236,6 +236,16 @@ bool ReplicationCoordinatorExternalStateMock::isReadCommittedSupportedByStorageE return _isReadCommittedSupported; } +StatusWith ReplicationCoordinatorExternalStateMock::multiApply( + OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + +void ReplicationCoordinatorExternalStateMock::multiSyncApply(const MultiApplier::Operations& ops) {} + +void ReplicationCoordinatorExternalStateMock::multiInitialSyncApply( + const MultiApplier::Operations& ops, const HostAndPort& source) {} + void ReplicationCoordinatorExternalStateMock::setIsReadCommittedEnabled(bool val) { _isReadCommittedSupported = val; } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 77d883df480..049f39745c8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -88,6 +88,12 @@ public: virtual void notifyOplogMetadataWaiters(); virtual double getElectionTimeoutOffsetLimitFraction() const; virtual bool isReadCommittedSupportedByStorageEngine(OperationContext* txn) const; + virtual StatusWith multiApply(OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) override; + virtual void multiSyncApply(const MultiApplier::Operations& ops) override; + virtual void multiInitialSyncApply(const MultiApplier::Operations& ops, + const HostAndPort& source) override; /** * Adds "host" to the list of hosts that this mock will match when responding to "isSelf" diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 14eba0564ed..3b5319b8cc7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -203,10 +203,6 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) { DataReplicatorOptions options; - options.applierFn = [](const MultiApplier::Operations&) {}; - options.multiApplyFn = - [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn) - -> OpTime { return OpTime(); }; options.rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); }; options.prepareReplSetUpdatePositionCommandFn = @@ -256,7 +252,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.isSlave())), _canServeNonLocalReads(0U), _dr(createDataReplicatorOptions(this), - stdx::make_unique(this), + stdx::make_unique(this, externalState), &_replExecutor), _isDurableStorageEngine(isDurableStorageEngineFn ? *isDurableStorageEngineFn : []() -> bool { return getGlobalServiceContext()->getGlobalStorageEngine()->isDurable(); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index d2758fc478c..21459b723a6 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -180,7 +180,7 @@ private: }; /** - * Applies the opeartions described in the oplog entries contained in "ops" using the + * Applies the operations described in the oplog entries contained in "ops" using the * "applyOperation" function. * * Returns ErrorCode::InterruptedAtShutdown if the node enters shutdown while applying ops, -- cgit v1.2.1