diff options
author | Benety Goh <benety@mongodb.com> | 2015-06-23 16:17:02 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-06-23 20:59:27 -0400 |
commit | cb23019011883f3c5f0ce0876248e80f05de4581 (patch) | |
tree | 2e71009592889df0c24c0c587f285b3dff865de1 | |
parent | f9311e512c9150280d26f0840cbe94a31c9b5a19 (diff) | |
download | mongo-cb23019011883f3c5f0ce0876248e80f05de4581.tar.gz |
SERVER-18036 removed ReplicationCoordinator dependency from DataReplicator
-rw-r--r-- | src/mongo/db/repl/SConscript | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 72 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 69 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 197 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/reporter.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/reporter.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_selector.h | 72 |
9 files changed, 303 insertions, 155 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 768bf9654c7..a081ce1ffc2 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -316,7 +316,6 @@ env.Library('repl_coordinator_interface', LIBDEPS=[ '$BUILD_DIR/mongo/util/net/hostandport', 'optime', - 'reporter', ]) env.Library('repl_coordinator_global', @@ -589,7 +588,7 @@ env.Library( 'applier', 'collection_cloner', 'database_cloner', - 'repl_coordinator_interface', + 'reporter', '$BUILD_DIR/mongo/client/fetcher', ], ) @@ -602,9 +601,7 @@ env.CppUnitTest( LIBDEPS=[ 'base_cloner_test_fixture', 'data_replicator', - 'replica_set_messages', 'replication_executor_test_fixture', - 'replmocks', '$BUILD_DIR/mongo/unittest/concurrency', ], ) diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 1eb7c9a0ae3..d932383cf63 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -42,7 +42,10 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" +#include "mongo/db/repl/member_state.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/reporter.h" +#include "mongo/db/repl/sync_source_selector.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" @@ -502,33 +505,24 @@ void DatabasesCloner::_failed() { } // Data Replicator -DataReplicator::DataReplicator(DataReplicatorOptions opts, - ReplicationExecutor* exec, - ReplicationCoordinator* replCoord) - : DataReplicator( - opts, - exec, - replCoord, - // TODO: replace this with a method in the replication coordinator. - [replCoord](const Timestamp& ts) { replCoord->setMyLastOptime(OpTime(ts, 0)); }) {} - DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec) - : DataReplicator(opts, exec, nullptr, [](const Timestamp& ts) {}) {} - -DataReplicator::DataReplicator(DataReplicatorOptions opts, - ReplicationExecutor* exec, - ReplicationCoordinator* replCoord, - OnBatchCompleteFn batchCompletedFn) : _opts(opts), _exec(exec), - _replCoord(replCoord), _state(DataReplicatorState::Uninitialized), _fetcherPaused(false), _reporterPaused(false), _applierActive(false), _applierPaused(false), - _batchCompletedFn(batchCompletedFn), - _oplogBuffer(kOplogBufferSize, &getSize) {} + _oplogBuffer(kOplogBufferSize, &getSize) { + uassert(ErrorCodes::BadValue, "invalid applier function", _opts.applierFn); + uassert(ErrorCodes::BadValue, + "invalid replication progress manager", + _opts.replicationProgressManager); + uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime); + uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime); + uassert(ErrorCodes::BadValue, "invalid setFollowerMode function", _opts.setFollowerMode); + uassert(ErrorCodes::BadValue, "invalid sync source selector", _opts.syncSourceSelector); +} DataReplicator::~DataReplicator() { DESTRUCTOR_GUARD(_cancelAllHandles_inlock(); _waitOnAll_inlock();); @@ -568,6 +562,11 @@ Timestamp DataReplicator::getLastTimestampFetched() const { return _lastTimestampFetched; } +size_t DataReplicator::getOplogBufferCount() const { + // Oplog buffer is internally synchronized. + return _oplogBuffer.count(); +} + std::string DataReplicator::getDiagnosticString() const { LockGuard lk(_mutex); str::stream out; @@ -716,7 +715,9 @@ TimestampStatus DataReplicator::initialSync() { Event initialSyncFinishEvent; if (attemptErrorStatus.isOK() && _syncSource.empty()) { attemptErrorStatus = _ensureGoodSyncSource_inlock(); - } else if (attemptErrorStatus.isOK()) { + } + + if (attemptErrorStatus.isOK()) { StatusWith<Event> status = _exec->makeEvent(); if (!status.isOK()) { attemptErrorStatus = status.getStatus(); @@ -954,7 +955,7 @@ void DataReplicator::_doNextActions_Rollback_inlock() { void DataReplicator::_doNextActions_Steady_inlock() { // Check sync source is still good. if (_syncSource.empty()) { - _syncSource = _replCoord->chooseNewSyncSource(); + _syncSource = _opts.syncSourceSelector->chooseNewSyncSource(); } if (_syncSource.empty()) { // No sync source, reschedule check @@ -982,7 +983,7 @@ void DataReplicator::_doNextActions_Steady_inlock() { if (!_reporterPaused && (!_reporter || !_reporter->getStatus().isOK())) { // TODO get reporter in good shape - _reporter.reset(new Reporter(_exec, _replCoord, _syncSource)); + _reporter.reset(new Reporter(_exec, _opts.replicationProgressManager, _syncSource)); } } @@ -1014,11 +1015,9 @@ void DataReplicator::_onApplyBatchFinish(const CallbackArgs& cbData, _lastTimestampApplied = ts.getValue(); lk.unlock(); - if (_batchCompletedFn) { - _batchCompletedFn(ts.getValue()); - } - // TODO: move the reporter to the replication coordinator and set _batchCompletedFn to a - // function in the replCoord. + _opts.setMyLastOptime(OpTime(ts.getValue(), 0)); + + // TODO: move the reporter to the replication coordinator. if (_reporter) { _reporter->trigger(); } @@ -1155,13 +1154,9 @@ Status DataReplicator::_scheduleFetch() { Status DataReplicator::_ensureGoodSyncSource_inlock() { if (_syncSource.empty()) { - if (_replCoord) { - _syncSource = _replCoord->chooseNewSyncSource(); - if (!_syncSource.empty()) { - return Status::OK(); - } - } else { - _syncSource = _opts.syncSource; // set this back to the options source + _syncSource = _opts.syncSourceSelector->chooseNewSyncSource(); + if (!_syncSource.empty()) { + return Status::OK(); } return Status{ErrorCodes::InvalidSyncSource, "No valid sync source."}; @@ -1178,8 +1173,7 @@ Status DataReplicator::_scheduleFetch_inlock() { } } - const auto startOptime = - _replCoord ? _replCoord->getMyLastOptime().getTimestamp() : _opts.startOptime; + const auto startOptime = _opts.getMyLastOptime().getTimestamp(); const auto remoteOplogNS = _opts.remoteOplogNS; // TODO: add query options await_data, oplog_replay @@ -1290,7 +1284,7 @@ void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse // possible rollback _rollbackCommonOptime = findCommonPoint(_syncSource, _lastTimestampApplied); if (_rollbackCommonOptime.isNull()) { - auto s = _replCoord->setFollowerMode(MemberState::RS_RECOVERING); + auto s = _opts.setFollowerMode(MemberState::RS_RECOVERING); if (!s) { error() << "Failed to transition to RECOVERING when " "we couldn't find oplog start position (" @@ -1299,7 +1293,7 @@ void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse } Date_t until{_exec->now() + _opts.blacklistSyncSourcePenaltyForOplogStartMissing}; - _replCoord->blacklistSyncSource(_syncSource, until); + _opts.syncSourceSelector->blacklistSyncSource(_syncSource, until); } else { // TODO: cleanup state/restart -- set _lastApplied, and other stuff } @@ -1311,7 +1305,7 @@ void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse default: Date_t until{_exec->now() + _opts.blacklistSyncSourcePenaltyForNetworkConnectionError}; - _replCoord->blacklistSyncSource(_syncSource, until); + _opts.syncSourceSelector->blacklistSyncSource(_syncSource, until); } LockGuard lk(_mutex); _syncSource = HostAndPort(); diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index 3bfce3e9a93..212f5e69dec 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -40,9 +40,9 @@ #include "mongo/db/repl/applier.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" -#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/reporter.h" +#include "mongo/db/repl/sync_source_selector.h" #include "mongo/stdx/thread.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/queue.h" @@ -69,6 +69,10 @@ using UniqueLock = stdx::unique_lock<stdx::mutex>; class OplogFetcher; struct InitialSyncState; +struct MemberState; +class ReplicationProgressManager; +class Reporter; +class SyncSourceSelector; /** State for decision tree */ enum class DataReplicatorState { @@ -84,6 +88,15 @@ std::string toString(DataReplicatorState s); enum class DataReplicatorScope { ReplicateAll, ReplicateDB, ReplicateCollection }; struct DataReplicatorOptions { + /** Function to return optime of last operation applied on this node */ + using GetMyLastOptimeFn = stdx::function<OpTime()>; + + /** Function to update optime of last operation applied on this node */ + using SetMyLastOptimeFn = stdx::function<void(const OpTime&)>; + + /** Function to sets this node into a specific follower mode. */ + using SetFollowerModeFn = stdx::function<bool(const MemberState&)>; + // Error and retry values Milliseconds syncSourceRetryWait{1000}; Milliseconds initialSyncRetryWait{1000}; @@ -91,7 +104,6 @@ struct DataReplicatorOptions { Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10}; // Replication settings - Timestamp startOptime; NamespaceString localOplogNS = NamespaceString("local.oplog.rs"); NamespaceString remoteOplogNS = NamespaceString("local.oplog.rs"); @@ -99,18 +111,18 @@ struct DataReplicatorOptions { DataReplicatorScope scope = DataReplicatorScope::ReplicateAll; std::string scopeNS; BSONObj filterCriteria; - HostAndPort syncSource; // for use without replCoord -- maybe some kind of rsMonitor/interface - // TODO: replace with real applier function - Applier::ApplyOperationFn applierFn = - [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); }; + Applier::ApplyOperationFn applierFn; + ReplicationProgressManager* replicationProgressManager = nullptr; + GetMyLastOptimeFn getMyLastOptime; + SetMyLastOptimeFn setMyLastOptime; + SetFollowerModeFn setFollowerMode; + SyncSourceSelector* syncSourceSelector = nullptr; std::string toString() const { return str::stream() << "DataReplicatorOptions -- " << " localOplogNs: " << localOplogNS.toString() - << " remoteOplogNS: " << remoteOplogNS.toString() - << " syncSource: " << syncSource.toString() - << " startOptime: " << startOptime.toString(); + << " remoteOplogNS: " << remoteOplogNS.toString(); } }; @@ -123,25 +135,8 @@ struct DataReplicatorOptions { */ class DataReplicator { public: - /** Function to call when a batch is applied. */ - using OnBatchCompleteFn = stdx::function<void(const Timestamp&)>; - - DataReplicator(DataReplicatorOptions opts, - ReplicationExecutor* exec, - ReplicationCoordinator* replCoord); - /** - * Used by non-replication coordinator processes, like sharding. - */ DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec); - /** - * Used for testing. - */ - DataReplicator(DataReplicatorOptions opts, - ReplicationExecutor* exec, - ReplicationCoordinator* replCoord, - OnBatchCompleteFn batchCompletedFn); - virtual ~DataReplicator(); Status start(); @@ -178,14 +173,17 @@ public: DataReplicatorState getState() const; Timestamp getLastTimestampFetched() const; + + /** + * Number of operations in the oplog buffer. + */ + size_t getOplogBufferCount() const; + std::string getDiagnosticString() const; // For testing only void _resetState_inlock(Timestamp lastAppliedOptime); - void __setSourceForTesting(HostAndPort src) { - _syncSource = src; - } void _setInitialSyncStorageInterface(CollectionCloner::StorageInterface* si); private: @@ -241,7 +239,6 @@ private: // Set during construction const DataReplicatorOptions _opts; ReplicationExecutor* _exec; - ReplicationCoordinator* _replCoord; // // All member variables are labeled with one of the following codes indicating the @@ -257,7 +254,7 @@ private: // _mutex or be in a callback in _exec to read. // (I) Independently synchronized, see member variable comment. - // Protects member data of this ReplicationCoordinator. + // Protects member data of this DataReplicator. mutable stdx::mutex _mutex; // (S) DataReplicatorState _state; // (MX) @@ -274,11 +271,9 @@ private: Handle _reporterHandle; // (M) std::unique_ptr<Reporter> _reporter; // (M) - bool _applierActive; // (M) - bool _applierPaused; // (X) - std::unique_ptr<Applier> _applier; // (M) - OnBatchCompleteFn _batchCompletedFn; // (M) - + bool _applierActive; // (M) + bool _applierPaused; // (X) + std::unique_ptr<Applier> _applier; // (M) HostAndPort _syncSource; // (M) Timestamp _lastTimestampFetched; // (MX) diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index de8f2da9f12..31ef44bef4d 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -36,13 +36,11 @@ #include "mongo/db/json.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/data_replicator.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/replication_coordinator_impl.h" -#include "mongo/db/repl/replica_set_config.h" -#include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/member_state.h" #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/reporter.h" +#include "mongo/db/repl/sync_source_selector.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/mutex.h" #include "mongo/util/fail_point_service.h" @@ -60,33 +58,41 @@ using LockGuard = stdx::lock_guard<stdx::mutex>; using UniqueLock = stdx::unique_lock<stdx::mutex>; using mutex = stdx::mutex; -ReplicaSetConfig assertMakeRSConfig(const BSONObj& configBson) { - ReplicaSetConfig config; - ASSERT_OK(config.initialize(configBson)); - ASSERT_OK(config.validate()); - return config; -} -const HostAndPort target("localhost", -1); - -class DataReplicatorTest : public ReplicationExecutorTest { +class DataReplicatorTest : public ReplicationExecutorTest, + public ReplicationProgressManager, + public SyncSourceSelector { public: DataReplicatorTest() {} void postExecutorThreadLaunch() override{}; + /** + * clear/reset state + */ void reset() { - // clear/reset state + _applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); }; + _setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; }; + _myLastOpTime = OpTime(); + _memberState = MemberState::RS_UNKNOWN; + _syncSource = HostAndPort("localhost", -1); } - void createDataReplicator(DataReplicatorOptions opts) { - _dr.reset(new DataReplicator(opts, &(getExecutor()), _repl.get())); - _dr->__setSourceForTesting(target); + // ReplicationProgressManager + bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override { + cmdBuilder->append("replSetUpdatePosition", 1); + return true; } - void createDataReplicator(DataReplicatorOptions opts, - DataReplicator::OnBatchCompleteFn batchCompletedFn) { - _dr.reset(new DataReplicator(opts, &(getExecutor()), _repl.get(), batchCompletedFn)); - _dr->__setSourceForTesting(target); + // SyncSourceSelector + void clearSyncSourceBlacklist() override {} + HostAndPort chooseNewSyncSource() override { + HostAndPort result = _syncSource; + _syncSource = HostAndPort(); + return result; + } + void blacklistSyncSource(const HostAndPort& host, Date_t until) {} + bool shouldChangeSyncSource(const HostAndPort& currentSource) { + return false; } void scheduleNetworkResponse(const BSONObj& obj) { @@ -129,34 +135,47 @@ public: DataReplicator& getDR() { return *_dr; } - ReplicationCoordinator& getRepl() { - return *_repl; - } protected: void setUp() override { ReplicationExecutorTest::setUp(); reset(); - _settings.replSet = "foo"; // We are a replica set :) - _repl.reset(new ReplicationCoordinatorMock(_settings)); launchExecutorThread(); DataReplicatorOptions options; options.initialSyncRetryWait = Milliseconds(0); - createDataReplicator(options); + options.applierFn = [this](OperationContext* txn, const BSONObj& operation) { + return _applierFn(txn, operation); + }; + options.replicationProgressManager = this; + options.getMyLastOptime = [this]() { return _myLastOpTime; }; + options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); }; + options.setFollowerMode = [this](const MemberState& state) { + _memberState = state; + return true; + }; + options.syncSourceSelector = this; + try { + _dr.reset(new DataReplicator(options, &(getExecutor()))); + } catch (...) { + ASSERT_OK(exceptionToStatus()); + } } void tearDown() override { ReplicationExecutorTest::tearDown(); _dr.reset(); - _repl.reset(); // Executor may still invoke callback before shutting down. } + Applier::ApplyOperationFn _applierFn; + DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime; + OpTime _myLastOpTime; + MemberState _memberState; + HostAndPort _syncSource; + private: std::unique_ptr<DataReplicator> _dr; - std::unique_ptr<ReplicationCoordinator> _repl; - ReplSettings _settings; }; TEST_F(DataReplicatorTest, CreateDestroy) {} @@ -392,13 +411,12 @@ TEST_F(InitialSyncTest, Complete) { TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) { DataReplicatorOptions opts; int applyCounter{0}; - opts.applierFn = [&](OperationContext* txn, const BSONObj& op) { + _applierFn = [&](OperationContext* txn, const BSONObj& op) { if (++applyCounter == 1) { return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); } return Status::OK(); }; - createDataReplicator(opts); const std::vector<BSONObj> responses = { // get latest oplog ts @@ -460,11 +478,10 @@ TEST_F(InitialSyncTest, Failpoint) { << BSON("_id" << 3 << "host" << "node3:12345"))); - ReplicaSetConfig config = assertMakeRSConfig(configObj); Timestamp time1(100, 1); OpTime opTime1(time1, OpTime::kDefaultTerm); - getRepl().setMyLastOptime(opTime1); - ASSERT(getRepl().setFollowerMode(MemberState::RS_SECONDARY)); + _myLastOpTime = opTime1; + _memberState = MemberState::RS_SECONDARY; DataReplicator* dr = &(getDR()); InitialSyncBackgroundRunner isbr(dr); @@ -509,8 +526,7 @@ protected: auto noi = net->getNextReadyRequest(); scheduleNetworkResponse(noi, oplogFetcherResponse); net->runReadyNetworkOperations(); - ASSERT_EQUALS(MemberState(MemberState::RS_RECOVERING).toString(), - getRepl().getMemberState().toString()); + ASSERT_EQUALS(MemberState(MemberState::RS_RECOVERING).toString(), _memberState.toString()); } }; @@ -567,7 +583,7 @@ TEST_F(SteadyStateTest, RemoteOplogFirstOperationTimestampDoesNotMatch) { "firstBatch: [{ts:Timestamp(1,1)}]}}")); } -TEST_F(SteadyStateTest, ApplyOneOperation) { +TEST_F(SteadyStateTest, PauseDataReplicator) { auto operationToApply = BSON("op" << "a" << "ts" << Timestamp(Seconds(123), 0)); @@ -575,23 +591,103 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - auto batchCompletedFn = [&](const Timestamp& ts) { + _applierFn = [&](OperationContext* txn, const BSONObj& op) { stdx::lock_guard<stdx::mutex> lock(mutex); - lastTimestampApplied = ts; + operationApplied = op; barrier.countDownAndWait(); + return Status::OK(); }; - DataReplicatorOptions opts; - opts.applierFn = [&](OperationContext* txn, const BSONObj& op) { + DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime; + _setMyLastOptime = [&](const OpTime& opTime) { + oldSetMyLastOptime(opTime); + stdx::lock_guard<stdx::mutex> lock(mutex); + lastTimestampApplied = opTime.getTimestamp(); + barrier.countDownAndWait(); + }; + + auto& dr = getDR(); + _myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kDefaultTerm); + _memberState = MemberState::RS_SECONDARY; + + auto net = getNet(); + net->enterNetwork(); + + ASSERT_OK(dr.start()); + + ASSERT_TRUE(net->hasReadyRequests()); + { + auto networkRequest = net->getNextReadyRequest(); + auto commandResponse = BSON( + "ok" << 1 << "cursor" << BSON("id" << 0LL << "ns" + << "local.oplog.rs" + << "firstBatch" << BSON_ARRAY(operationToApply))); + scheduleNetworkResponse(networkRequest, commandResponse); + } + + dr.pause(); + + ASSERT_EQUALS(0U, dr.getOplogBufferCount()); + + // Data replication will process the fetcher response but will not schedule the applier. + net->runReadyNetworkOperations(); + ASSERT_EQUALS(operationToApply["ts"].timestamp(), dr.getLastTimestampFetched()); + + // Schedule a bogus work item to ensure that the operation applier function + // is not scheduled. + auto& exec = getExecutor(); + exec.scheduleWork( + [&barrier](const executor::TaskExecutor::CallbackArgs&) { barrier.countDownAndWait(); }); + + + // Wake up executor thread and wait for bogus work callback to be invoked. + net->exitNetwork(); + barrier.countDownAndWait(); + + // Oplog buffer should contain fetched operations since applier is not scheduled. + ASSERT_EQUALS(1U, dr.getOplogBufferCount()); + + dr.resume(); + + // Wait for applier function. + barrier.countDownAndWait(); + // Run scheduleWork() work item scheduled in DataReplicator::_onApplyBatchFinish(). + net->exitNetwork(); + + // Wait for batch completion callback. + barrier.countDownAndWait(); + + ASSERT_EQUALS(MemberState(MemberState::RS_SECONDARY).toString(), _memberState.toString()); + { + stdx::lock_guard<stdx::mutex> lock(mutex); + ASSERT_EQUALS(operationToApply, operationApplied); + ASSERT_EQUALS(operationToApply["ts"].timestamp(), lastTimestampApplied); + } +} + +TEST_F(SteadyStateTest, ApplyOneOperation) { + auto operationToApply = BSON("op" + << "a" + << "ts" << Timestamp(Seconds(123), 0)); + stdx::mutex mutex; + unittest::Barrier barrier(2U); + Timestamp lastTimestampApplied; + BSONObj operationApplied; + _applierFn = [&](OperationContext* txn, const BSONObj& op) { stdx::lock_guard<stdx::mutex> lock(mutex); operationApplied = op; barrier.countDownAndWait(); return Status::OK(); }; - createDataReplicator(opts, batchCompletedFn); + DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime; + _setMyLastOptime = [&](const OpTime& opTime) { + oldSetMyLastOptime(opTime); + stdx::lock_guard<stdx::mutex> lock(mutex); + lastTimestampApplied = opTime.getTimestamp(); + barrier.countDownAndWait(); + }; - auto& repl = getRepl(); - repl.setMyLastOptime(OpTime(operationToApply["ts"].timestamp(), 0)); - ASSERT_TRUE(repl.setFollowerMode(MemberState::RS_SECONDARY)); + _myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kDefaultTerm); + _memberState = MemberState::RS_SECONDARY; auto net = getNet(); net->enterNetwork(); @@ -608,7 +704,11 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { << "firstBatch" << BSON_ARRAY(operationToApply))); scheduleNetworkResponse(networkRequest, commandResponse); } + ASSERT_EQUALS(0U, dr.getOplogBufferCount()); + + // Oplog buffer should be empty because contents are transferred to applier. net->runReadyNetworkOperations(); + ASSERT_EQUALS(0U, dr.getOplogBufferCount()); // Wait for applier function. barrier.countDownAndWait(); @@ -619,8 +719,7 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { // Wait for batch completion callback. barrier.countDownAndWait(); - ASSERT_EQUALS(MemberState(MemberState::RS_SECONDARY).toString(), - repl.getMemberState().toString()); + ASSERT_EQUALS(MemberState(MemberState::RS_SECONDARY).toString(), _memberState.toString()); { stdx::lock_guard<stdx::mutex> lock(mutex); ASSERT_EQUALS(operationToApply, operationApplied); diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 9bc7179bb90..84b2d6a56c4 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -35,6 +35,7 @@ #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/reporter.h" +#include "mongo/db/repl/sync_source_selector.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -84,7 +85,7 @@ extern const char* replAllDead; * with the rest of the system. The public methods on ReplicationCoordinator are the public * API that the replication subsystem presents to the rest of the codebase. */ -class ReplicationCoordinator : public ReplicationProgressManager { +class ReplicationCoordinator : public ReplicationProgressManager, public SyncSourceSelector { MONGO_DISALLOW_COPYING(ReplicationCoordinator); public: @@ -162,11 +163,6 @@ public: virtual Seconds getSlaveDelaySecs() const = 0; /** - * Clears the list of sync sources we have blacklisted. - */ - virtual void clearSyncSourceBlacklist() = 0; - - /** * Blocks the calling thread for up to writeConcern.wTimeout millis, or until "opTime" has * been replicated to at least a set of nodes that satisfies the writeConcern, whichever * comes first. A writeConcern.wTimeout of 0 indicates no timeout (block forever) and a @@ -563,28 +559,12 @@ public: virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) = 0; /** - * Chooses a viable sync source, or, if none available, returns empty HostAndPort. - */ - virtual HostAndPort chooseNewSyncSource() = 0; - - /** - * Blacklists choosing 'host' as a sync source until time 'until'. - */ - virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0; - - /** * Loads the optime from the last op in the oplog into the coordinator's lastOpApplied * value. */ virtual void resetLastOpTimeFromOplog(OperationContext* txn) = 0; /** - * Determines if a new sync source should be considered. - * currentSource: the current sync source - */ - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) = 0; - - /** * Returns the OpTime of the latest replica set-committed op known to this server. * Committed means a majority of the voting nodes of the config are known to have the * operation in their oplogs. This implies such ops will never be rolled back. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 049de8be7dc..2492b6b8671 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -149,6 +149,19 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& } return ReplicationCoordinator::modeNone; } + +DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) { + DataReplicatorOptions options; + options.applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); }; + options.replicationProgressManager = replCoord; + options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastOptime(); }; + options.setMyLastOptime = + [replCoord](const OpTime& opTime) { replCoord->setMyLastOptime(opTime); }; + options.setFollowerMode = + [replCoord](const MemberState& newState) { return replCoord->setFollowerMode(newState); }; + options.syncSourceSelector = replCoord; + return options; +} } // namespace ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( @@ -174,7 +187,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _sleptLastElection(false), _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.slave)), _canServeNonLocalReads(0U), - _dr(DataReplicatorOptions(), &_replExecutor, this) { + _dr(createDataReplicatorOptions(this), &_replExecutor) { if (!isReplEnabled()) { return; } diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp index b05bc2dbdd9..5a09ba48480 100644 --- a/src/mongo/db/repl/reporter.cpp +++ b/src/mongo/db/repl/reporter.cpp @@ -38,8 +38,6 @@ namespace mongo { namespace repl { -ReplicationProgressManager::~ReplicationProgressManager() {} - Reporter::Reporter(ReplicationExecutor* executor, ReplicationProgressManager* replicationProgressManager, const HostAndPort& target) diff --git a/src/mongo/db/repl/reporter.h b/src/mongo/db/repl/reporter.h index c7c502e8f5f..395f47d81c4 100644 --- a/src/mongo/db/repl/reporter.h +++ b/src/mongo/db/repl/reporter.h @@ -38,7 +38,7 @@ namespace repl { class ReplicationProgressManager { public: virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) = 0; - virtual ~ReplicationProgressManager(); + virtual ~ReplicationProgressManager() = default; }; class Reporter { diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h new file mode 100644 index 00000000000..846c06c24bf --- /dev/null +++ b/src/mongo/db/repl/sync_source_selector.h @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace repl { + + +/** + * Manage list of viable and blocked sync sources that we can replicate from. + */ +class SyncSourceSelector { + MONGO_DISALLOW_COPYING(SyncSourceSelector); + +public: + SyncSourceSelector() = default; + virtual ~SyncSourceSelector() = default; + + /** + * Clears the list of sync sources we have blacklisted. + */ + virtual void clearSyncSourceBlacklist() = 0; + + /** + * Chooses a viable sync source, or, if none available, returns empty HostAndPort. + */ + virtual HostAndPort chooseNewSyncSource() = 0; + + /** + * Blacklists choosing 'host' as a sync source until time 'until'. + */ + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0; + + /** + * Determines if a new sync source should be considered. + * currentSource: the current sync source + */ + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) = 0; +}; + +} // namespace repl +} // namespace mongo |