summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-06-23 16:17:02 -0400
committerBenety Goh <benety@mongodb.com>2015-06-23 20:59:27 -0400
commitcb23019011883f3c5f0ce0876248e80f05de4581 (patch)
tree2e71009592889df0c24c0c587f285b3dff865de1
parentf9311e512c9150280d26f0840cbe94a31c9b5a19 (diff)
downloadmongo-cb23019011883f3c5f0ce0876248e80f05de4581.tar.gz
SERVER-18036 removed ReplicationCoordinator dependency from DataReplicator
-rw-r--r--src/mongo/db/repl/SConscript5
-rw-r--r--src/mongo/db/repl/data_replicator.cpp72
-rw-r--r--src/mongo/db/repl/data_replicator.h69
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp197
-rw-r--r--src/mongo/db/repl/replication_coordinator.h24
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp15
-rw-r--r--src/mongo/db/repl/reporter.cpp2
-rw-r--r--src/mongo/db/repl/reporter.h2
-rw-r--r--src/mongo/db/repl/sync_source_selector.h72
9 files changed, 303 insertions, 155 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 768bf9654c7..a081ce1ffc2 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -316,7 +316,6 @@ env.Library('repl_coordinator_interface',
LIBDEPS=[
'$BUILD_DIR/mongo/util/net/hostandport',
'optime',
- 'reporter',
])
env.Library('repl_coordinator_global',
@@ -589,7 +588,7 @@ env.Library(
'applier',
'collection_cloner',
'database_cloner',
- 'repl_coordinator_interface',
+ 'reporter',
'$BUILD_DIR/mongo/client/fetcher',
],
)
@@ -602,9 +601,7 @@ env.CppUnitTest(
LIBDEPS=[
'base_cloner_test_fixture',
'data_replicator',
- 'replica_set_messages',
'replication_executor_test_fixture',
- 'replmocks',
'$BUILD_DIR/mongo/unittest/concurrency',
],
)
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 1eb7c9a0ae3..d932383cf63 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -42,7 +42,10 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
+#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/reporter.h"
+#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/assert_util.h"
@@ -502,33 +505,24 @@ void DatabasesCloner::_failed() {
}
// Data Replicator
-DataReplicator::DataReplicator(DataReplicatorOptions opts,
- ReplicationExecutor* exec,
- ReplicationCoordinator* replCoord)
- : DataReplicator(
- opts,
- exec,
- replCoord,
- // TODO: replace this with a method in the replication coordinator.
- [replCoord](const Timestamp& ts) { replCoord->setMyLastOptime(OpTime(ts, 0)); }) {}
-
DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec)
- : DataReplicator(opts, exec, nullptr, [](const Timestamp& ts) {}) {}
-
-DataReplicator::DataReplicator(DataReplicatorOptions opts,
- ReplicationExecutor* exec,
- ReplicationCoordinator* replCoord,
- OnBatchCompleteFn batchCompletedFn)
: _opts(opts),
_exec(exec),
- _replCoord(replCoord),
_state(DataReplicatorState::Uninitialized),
_fetcherPaused(false),
_reporterPaused(false),
_applierActive(false),
_applierPaused(false),
- _batchCompletedFn(batchCompletedFn),
- _oplogBuffer(kOplogBufferSize, &getSize) {}
+ _oplogBuffer(kOplogBufferSize, &getSize) {
+ uassert(ErrorCodes::BadValue, "invalid applier function", _opts.applierFn);
+ uassert(ErrorCodes::BadValue,
+ "invalid replication progress manager",
+ _opts.replicationProgressManager);
+ uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime);
+ uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime);
+ uassert(ErrorCodes::BadValue, "invalid setFollowerMode function", _opts.setFollowerMode);
+ uassert(ErrorCodes::BadValue, "invalid sync source selector", _opts.syncSourceSelector);
+}
DataReplicator::~DataReplicator() {
DESTRUCTOR_GUARD(_cancelAllHandles_inlock(); _waitOnAll_inlock(););
@@ -568,6 +562,11 @@ Timestamp DataReplicator::getLastTimestampFetched() const {
return _lastTimestampFetched;
}
+size_t DataReplicator::getOplogBufferCount() const {
+ // Oplog buffer is internally synchronized.
+ return _oplogBuffer.count();
+}
+
std::string DataReplicator::getDiagnosticString() const {
LockGuard lk(_mutex);
str::stream out;
@@ -716,7 +715,9 @@ TimestampStatus DataReplicator::initialSync() {
Event initialSyncFinishEvent;
if (attemptErrorStatus.isOK() && _syncSource.empty()) {
attemptErrorStatus = _ensureGoodSyncSource_inlock();
- } else if (attemptErrorStatus.isOK()) {
+ }
+
+ if (attemptErrorStatus.isOK()) {
StatusWith<Event> status = _exec->makeEvent();
if (!status.isOK()) {
attemptErrorStatus = status.getStatus();
@@ -954,7 +955,7 @@ void DataReplicator::_doNextActions_Rollback_inlock() {
void DataReplicator::_doNextActions_Steady_inlock() {
// Check sync source is still good.
if (_syncSource.empty()) {
- _syncSource = _replCoord->chooseNewSyncSource();
+ _syncSource = _opts.syncSourceSelector->chooseNewSyncSource();
}
if (_syncSource.empty()) {
// No sync source, reschedule check
@@ -982,7 +983,7 @@ void DataReplicator::_doNextActions_Steady_inlock() {
if (!_reporterPaused && (!_reporter || !_reporter->getStatus().isOK())) {
// TODO get reporter in good shape
- _reporter.reset(new Reporter(_exec, _replCoord, _syncSource));
+ _reporter.reset(new Reporter(_exec, _opts.replicationProgressManager, _syncSource));
}
}
@@ -1014,11 +1015,9 @@ void DataReplicator::_onApplyBatchFinish(const CallbackArgs& cbData,
_lastTimestampApplied = ts.getValue();
lk.unlock();
- if (_batchCompletedFn) {
- _batchCompletedFn(ts.getValue());
- }
- // TODO: move the reporter to the replication coordinator and set _batchCompletedFn to a
- // function in the replCoord.
+ _opts.setMyLastOptime(OpTime(ts.getValue(), 0));
+
+ // TODO: move the reporter to the replication coordinator.
if (_reporter) {
_reporter->trigger();
}
@@ -1155,13 +1154,9 @@ Status DataReplicator::_scheduleFetch() {
Status DataReplicator::_ensureGoodSyncSource_inlock() {
if (_syncSource.empty()) {
- if (_replCoord) {
- _syncSource = _replCoord->chooseNewSyncSource();
- if (!_syncSource.empty()) {
- return Status::OK();
- }
- } else {
- _syncSource = _opts.syncSource; // set this back to the options source
+ _syncSource = _opts.syncSourceSelector->chooseNewSyncSource();
+ if (!_syncSource.empty()) {
+ return Status::OK();
}
return Status{ErrorCodes::InvalidSyncSource, "No valid sync source."};
@@ -1178,8 +1173,7 @@ Status DataReplicator::_scheduleFetch_inlock() {
}
}
- const auto startOptime =
- _replCoord ? _replCoord->getMyLastOptime().getTimestamp() : _opts.startOptime;
+ const auto startOptime = _opts.getMyLastOptime().getTimestamp();
const auto remoteOplogNS = _opts.remoteOplogNS;
// TODO: add query options await_data, oplog_replay
@@ -1290,7 +1284,7 @@ void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse
// possible rollback
_rollbackCommonOptime = findCommonPoint(_syncSource, _lastTimestampApplied);
if (_rollbackCommonOptime.isNull()) {
- auto s = _replCoord->setFollowerMode(MemberState::RS_RECOVERING);
+ auto s = _opts.setFollowerMode(MemberState::RS_RECOVERING);
if (!s) {
error() << "Failed to transition to RECOVERING when "
"we couldn't find oplog start position ("
@@ -1299,7 +1293,7 @@ void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse
}
Date_t until{_exec->now() +
_opts.blacklistSyncSourcePenaltyForOplogStartMissing};
- _replCoord->blacklistSyncSource(_syncSource, until);
+ _opts.syncSourceSelector->blacklistSyncSource(_syncSource, until);
} else {
// TODO: cleanup state/restart -- set _lastApplied, and other stuff
}
@@ -1311,7 +1305,7 @@ void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse
default:
Date_t until{_exec->now() +
_opts.blacklistSyncSourcePenaltyForNetworkConnectionError};
- _replCoord->blacklistSyncSource(_syncSource, until);
+ _opts.syncSourceSelector->blacklistSyncSource(_syncSource, until);
}
LockGuard lk(_mutex);
_syncSource = HostAndPort();
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index 3bfce3e9a93..212f5e69dec 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -40,9 +40,9 @@
#include "mongo/db/repl/applier.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
-#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_executor.h"
-#include "mongo/db/repl/reporter.h"
+#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/queue.h"
@@ -69,6 +69,10 @@ using UniqueLock = stdx::unique_lock<stdx::mutex>;
class OplogFetcher;
struct InitialSyncState;
+struct MemberState;
+class ReplicationProgressManager;
+class Reporter;
+class SyncSourceSelector;
/** State for decision tree */
enum class DataReplicatorState {
@@ -84,6 +88,15 @@ std::string toString(DataReplicatorState s);
enum class DataReplicatorScope { ReplicateAll, ReplicateDB, ReplicateCollection };
struct DataReplicatorOptions {
+ /** Function to return optime of last operation applied on this node */
+ using GetMyLastOptimeFn = stdx::function<OpTime()>;
+
+ /** Function to update optime of last operation applied on this node */
+ using SetMyLastOptimeFn = stdx::function<void(const OpTime&)>;
+
+ /** Function to sets this node into a specific follower mode. */
+ using SetFollowerModeFn = stdx::function<bool(const MemberState&)>;
+
// Error and retry values
Milliseconds syncSourceRetryWait{1000};
Milliseconds initialSyncRetryWait{1000};
@@ -91,7 +104,6 @@ struct DataReplicatorOptions {
Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10};
// Replication settings
- Timestamp startOptime;
NamespaceString localOplogNS = NamespaceString("local.oplog.rs");
NamespaceString remoteOplogNS = NamespaceString("local.oplog.rs");
@@ -99,18 +111,18 @@ struct DataReplicatorOptions {
DataReplicatorScope scope = DataReplicatorScope::ReplicateAll;
std::string scopeNS;
BSONObj filterCriteria;
- HostAndPort syncSource; // for use without replCoord -- maybe some kind of rsMonitor/interface
- // TODO: replace with real applier function
- Applier::ApplyOperationFn applierFn =
- [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); };
+ Applier::ApplyOperationFn applierFn;
+ ReplicationProgressManager* replicationProgressManager = nullptr;
+ GetMyLastOptimeFn getMyLastOptime;
+ SetMyLastOptimeFn setMyLastOptime;
+ SetFollowerModeFn setFollowerMode;
+ SyncSourceSelector* syncSourceSelector = nullptr;
std::string toString() const {
return str::stream() << "DataReplicatorOptions -- "
<< " localOplogNs: " << localOplogNS.toString()
- << " remoteOplogNS: " << remoteOplogNS.toString()
- << " syncSource: " << syncSource.toString()
- << " startOptime: " << startOptime.toString();
+ << " remoteOplogNS: " << remoteOplogNS.toString();
}
};
@@ -123,25 +135,8 @@ struct DataReplicatorOptions {
*/
class DataReplicator {
public:
- /** Function to call when a batch is applied. */
- using OnBatchCompleteFn = stdx::function<void(const Timestamp&)>;
-
- DataReplicator(DataReplicatorOptions opts,
- ReplicationExecutor* exec,
- ReplicationCoordinator* replCoord);
- /**
- * Used by non-replication coordinator processes, like sharding.
- */
DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec);
- /**
- * Used for testing.
- */
- DataReplicator(DataReplicatorOptions opts,
- ReplicationExecutor* exec,
- ReplicationCoordinator* replCoord,
- OnBatchCompleteFn batchCompletedFn);
-
virtual ~DataReplicator();
Status start();
@@ -178,14 +173,17 @@ public:
DataReplicatorState getState() const;
Timestamp getLastTimestampFetched() const;
+
+ /**
+ * Number of operations in the oplog buffer.
+ */
+ size_t getOplogBufferCount() const;
+
std::string getDiagnosticString() const;
// For testing only
void _resetState_inlock(Timestamp lastAppliedOptime);
- void __setSourceForTesting(HostAndPort src) {
- _syncSource = src;
- }
void _setInitialSyncStorageInterface(CollectionCloner::StorageInterface* si);
private:
@@ -241,7 +239,6 @@ private:
// Set during construction
const DataReplicatorOptions _opts;
ReplicationExecutor* _exec;
- ReplicationCoordinator* _replCoord;
//
// All member variables are labeled with one of the following codes indicating the
@@ -257,7 +254,7 @@ private:
// _mutex or be in a callback in _exec to read.
// (I) Independently synchronized, see member variable comment.
- // Protects member data of this ReplicationCoordinator.
+ // Protects member data of this DataReplicator.
mutable stdx::mutex _mutex; // (S)
DataReplicatorState _state; // (MX)
@@ -274,11 +271,9 @@ private:
Handle _reporterHandle; // (M)
std::unique_ptr<Reporter> _reporter; // (M)
- bool _applierActive; // (M)
- bool _applierPaused; // (X)
- std::unique_ptr<Applier> _applier; // (M)
- OnBatchCompleteFn _batchCompletedFn; // (M)
-
+ bool _applierActive; // (M)
+ bool _applierPaused; // (X)
+ std::unique_ptr<Applier> _applier; // (M)
HostAndPort _syncSource; // (M)
Timestamp _lastTimestampFetched; // (MX)
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index de8f2da9f12..31ef44bef4d 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -36,13 +36,11 @@
#include "mongo/db/json.h"
#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/data_replicator.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/replication_coordinator_impl.h"
-#include "mongo/db/repl/replica_set_config.h"
-#include "mongo/db/repl/replication_coordinator_mock.h"
-#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/reporter.h"
+#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/fail_point_service.h"
@@ -60,33 +58,41 @@ using LockGuard = stdx::lock_guard<stdx::mutex>;
using UniqueLock = stdx::unique_lock<stdx::mutex>;
using mutex = stdx::mutex;
-ReplicaSetConfig assertMakeRSConfig(const BSONObj& configBson) {
- ReplicaSetConfig config;
- ASSERT_OK(config.initialize(configBson));
- ASSERT_OK(config.validate());
- return config;
-}
-const HostAndPort target("localhost", -1);
-
-class DataReplicatorTest : public ReplicationExecutorTest {
+class DataReplicatorTest : public ReplicationExecutorTest,
+ public ReplicationProgressManager,
+ public SyncSourceSelector {
public:
DataReplicatorTest() {}
void postExecutorThreadLaunch() override{};
+ /**
+ * clear/reset state
+ */
void reset() {
- // clear/reset state
+ _applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); };
+ _setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; };
+ _myLastOpTime = OpTime();
+ _memberState = MemberState::RS_UNKNOWN;
+ _syncSource = HostAndPort("localhost", -1);
}
- void createDataReplicator(DataReplicatorOptions opts) {
- _dr.reset(new DataReplicator(opts, &(getExecutor()), _repl.get()));
- _dr->__setSourceForTesting(target);
+ // ReplicationProgressManager
+ bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override {
+ cmdBuilder->append("replSetUpdatePosition", 1);
+ return true;
}
- void createDataReplicator(DataReplicatorOptions opts,
- DataReplicator::OnBatchCompleteFn batchCompletedFn) {
- _dr.reset(new DataReplicator(opts, &(getExecutor()), _repl.get(), batchCompletedFn));
- _dr->__setSourceForTesting(target);
+ // SyncSourceSelector
+ void clearSyncSourceBlacklist() override {}
+ HostAndPort chooseNewSyncSource() override {
+ HostAndPort result = _syncSource;
+ _syncSource = HostAndPort();
+ return result;
+ }
+ void blacklistSyncSource(const HostAndPort& host, Date_t until) {}
+ bool shouldChangeSyncSource(const HostAndPort& currentSource) {
+ return false;
}
void scheduleNetworkResponse(const BSONObj& obj) {
@@ -129,34 +135,47 @@ public:
DataReplicator& getDR() {
return *_dr;
}
- ReplicationCoordinator& getRepl() {
- return *_repl;
- }
protected:
void setUp() override {
ReplicationExecutorTest::setUp();
reset();
- _settings.replSet = "foo"; // We are a replica set :)
- _repl.reset(new ReplicationCoordinatorMock(_settings));
launchExecutorThread();
DataReplicatorOptions options;
options.initialSyncRetryWait = Milliseconds(0);
- createDataReplicator(options);
+ options.applierFn = [this](OperationContext* txn, const BSONObj& operation) {
+ return _applierFn(txn, operation);
+ };
+ options.replicationProgressManager = this;
+ options.getMyLastOptime = [this]() { return _myLastOpTime; };
+ options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); };
+ options.setFollowerMode = [this](const MemberState& state) {
+ _memberState = state;
+ return true;
+ };
+ options.syncSourceSelector = this;
+ try {
+ _dr.reset(new DataReplicator(options, &(getExecutor())));
+ } catch (...) {
+ ASSERT_OK(exceptionToStatus());
+ }
}
void tearDown() override {
ReplicationExecutorTest::tearDown();
_dr.reset();
- _repl.reset();
// Executor may still invoke callback before shutting down.
}
+ Applier::ApplyOperationFn _applierFn;
+ DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime;
+ OpTime _myLastOpTime;
+ MemberState _memberState;
+ HostAndPort _syncSource;
+
private:
std::unique_ptr<DataReplicator> _dr;
- std::unique_ptr<ReplicationCoordinator> _repl;
- ReplSettings _settings;
};
TEST_F(DataReplicatorTest, CreateDestroy) {}
@@ -392,13 +411,12 @@ TEST_F(InitialSyncTest, Complete) {
TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) {
DataReplicatorOptions opts;
int applyCounter{0};
- opts.applierFn = [&](OperationContext* txn, const BSONObj& op) {
+ _applierFn = [&](OperationContext* txn, const BSONObj& op) {
if (++applyCounter == 1) {
return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
}
return Status::OK();
};
- createDataReplicator(opts);
const std::vector<BSONObj> responses = {
// get latest oplog ts
@@ -460,11 +478,10 @@ TEST_F(InitialSyncTest, Failpoint) {
<< BSON("_id" << 3 << "host"
<< "node3:12345")));
- ReplicaSetConfig config = assertMakeRSConfig(configObj);
Timestamp time1(100, 1);
OpTime opTime1(time1, OpTime::kDefaultTerm);
- getRepl().setMyLastOptime(opTime1);
- ASSERT(getRepl().setFollowerMode(MemberState::RS_SECONDARY));
+ _myLastOpTime = opTime1;
+ _memberState = MemberState::RS_SECONDARY;
DataReplicator* dr = &(getDR());
InitialSyncBackgroundRunner isbr(dr);
@@ -509,8 +526,7 @@ protected:
auto noi = net->getNextReadyRequest();
scheduleNetworkResponse(noi, oplogFetcherResponse);
net->runReadyNetworkOperations();
- ASSERT_EQUALS(MemberState(MemberState::RS_RECOVERING).toString(),
- getRepl().getMemberState().toString());
+ ASSERT_EQUALS(MemberState(MemberState::RS_RECOVERING).toString(), _memberState.toString());
}
};
@@ -567,7 +583,7 @@ TEST_F(SteadyStateTest, RemoteOplogFirstOperationTimestampDoesNotMatch) {
"firstBatch: [{ts:Timestamp(1,1)}]}}"));
}
-TEST_F(SteadyStateTest, ApplyOneOperation) {
+TEST_F(SteadyStateTest, PauseDataReplicator) {
auto operationToApply = BSON("op"
<< "a"
<< "ts" << Timestamp(Seconds(123), 0));
@@ -575,23 +591,103 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- auto batchCompletedFn = [&](const Timestamp& ts) {
+ _applierFn = [&](OperationContext* txn, const BSONObj& op) {
stdx::lock_guard<stdx::mutex> lock(mutex);
- lastTimestampApplied = ts;
+ operationApplied = op;
barrier.countDownAndWait();
+ return Status::OK();
};
- DataReplicatorOptions opts;
- opts.applierFn = [&](OperationContext* txn, const BSONObj& op) {
+ DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
+ _setMyLastOptime = [&](const OpTime& opTime) {
+ oldSetMyLastOptime(opTime);
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ lastTimestampApplied = opTime.getTimestamp();
+ barrier.countDownAndWait();
+ };
+
+ auto& dr = getDR();
+ _myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kDefaultTerm);
+ _memberState = MemberState::RS_SECONDARY;
+
+ auto net = getNet();
+ net->enterNetwork();
+
+ ASSERT_OK(dr.start());
+
+ ASSERT_TRUE(net->hasReadyRequests());
+ {
+ auto networkRequest = net->getNextReadyRequest();
+ auto commandResponse = BSON(
+ "ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
+ << "local.oplog.rs"
+ << "firstBatch" << BSON_ARRAY(operationToApply)));
+ scheduleNetworkResponse(networkRequest, commandResponse);
+ }
+
+ dr.pause();
+
+ ASSERT_EQUALS(0U, dr.getOplogBufferCount());
+
+ // Data replication will process the fetcher response but will not schedule the applier.
+ net->runReadyNetworkOperations();
+ ASSERT_EQUALS(operationToApply["ts"].timestamp(), dr.getLastTimestampFetched());
+
+ // Schedule a bogus work item to ensure that the operation applier function
+ // is not scheduled.
+ auto& exec = getExecutor();
+ exec.scheduleWork(
+ [&barrier](const executor::TaskExecutor::CallbackArgs&) { barrier.countDownAndWait(); });
+
+
+ // Wake up executor thread and wait for bogus work callback to be invoked.
+ net->exitNetwork();
+ barrier.countDownAndWait();
+
+ // Oplog buffer should contain fetched operations since applier is not scheduled.
+ ASSERT_EQUALS(1U, dr.getOplogBufferCount());
+
+ dr.resume();
+
+ // Wait for applier function.
+ barrier.countDownAndWait();
+ // Run scheduleWork() work item scheduled in DataReplicator::_onApplyBatchFinish().
+ net->exitNetwork();
+
+ // Wait for batch completion callback.
+ barrier.countDownAndWait();
+
+ ASSERT_EQUALS(MemberState(MemberState::RS_SECONDARY).toString(), _memberState.toString());
+ {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ ASSERT_EQUALS(operationToApply, operationApplied);
+ ASSERT_EQUALS(operationToApply["ts"].timestamp(), lastTimestampApplied);
+ }
+}
+
+TEST_F(SteadyStateTest, ApplyOneOperation) {
+ auto operationToApply = BSON("op"
+ << "a"
+ << "ts" << Timestamp(Seconds(123), 0));
+ stdx::mutex mutex;
+ unittest::Barrier barrier(2U);
+ Timestamp lastTimestampApplied;
+ BSONObj operationApplied;
+ _applierFn = [&](OperationContext* txn, const BSONObj& op) {
stdx::lock_guard<stdx::mutex> lock(mutex);
operationApplied = op;
barrier.countDownAndWait();
return Status::OK();
};
- createDataReplicator(opts, batchCompletedFn);
+ DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
+ _setMyLastOptime = [&](const OpTime& opTime) {
+ oldSetMyLastOptime(opTime);
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ lastTimestampApplied = opTime.getTimestamp();
+ barrier.countDownAndWait();
+ };
- auto& repl = getRepl();
- repl.setMyLastOptime(OpTime(operationToApply["ts"].timestamp(), 0));
- ASSERT_TRUE(repl.setFollowerMode(MemberState::RS_SECONDARY));
+ _myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kDefaultTerm);
+ _memberState = MemberState::RS_SECONDARY;
auto net = getNet();
net->enterNetwork();
@@ -608,7 +704,11 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
<< "firstBatch" << BSON_ARRAY(operationToApply)));
scheduleNetworkResponse(networkRequest, commandResponse);
}
+ ASSERT_EQUALS(0U, dr.getOplogBufferCount());
+
+ // Oplog buffer should be empty because contents are transferred to applier.
net->runReadyNetworkOperations();
+ ASSERT_EQUALS(0U, dr.getOplogBufferCount());
// Wait for applier function.
barrier.countDownAndWait();
@@ -619,8 +719,7 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
// Wait for batch completion callback.
barrier.countDownAndWait();
- ASSERT_EQUALS(MemberState(MemberState::RS_SECONDARY).toString(),
- repl.getMemberState().toString());
+ ASSERT_EQUALS(MemberState(MemberState::RS_SECONDARY).toString(), _memberState.toString());
{
stdx::lock_guard<stdx::mutex> lock(mutex);
ASSERT_EQUALS(operationToApply, operationApplied);
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 9bc7179bb90..84b2d6a56c4 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -35,6 +35,7 @@
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/reporter.h"
+#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -84,7 +85,7 @@ extern const char* replAllDead;
* with the rest of the system. The public methods on ReplicationCoordinator are the public
* API that the replication subsystem presents to the rest of the codebase.
*/
-class ReplicationCoordinator : public ReplicationProgressManager {
+class ReplicationCoordinator : public ReplicationProgressManager, public SyncSourceSelector {
MONGO_DISALLOW_COPYING(ReplicationCoordinator);
public:
@@ -162,11 +163,6 @@ public:
virtual Seconds getSlaveDelaySecs() const = 0;
/**
- * Clears the list of sync sources we have blacklisted.
- */
- virtual void clearSyncSourceBlacklist() = 0;
-
- /**
* Blocks the calling thread for up to writeConcern.wTimeout millis, or until "opTime" has
* been replicated to at least a set of nodes that satisfies the writeConcern, whichever
* comes first. A writeConcern.wTimeout of 0 indicates no timeout (block forever) and a
@@ -563,28 +559,12 @@ public:
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) = 0;
/**
- * Chooses a viable sync source, or, if none available, returns empty HostAndPort.
- */
- virtual HostAndPort chooseNewSyncSource() = 0;
-
- /**
- * Blacklists choosing 'host' as a sync source until time 'until'.
- */
- virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0;
-
- /**
* Loads the optime from the last op in the oplog into the coordinator's lastOpApplied
* value.
*/
virtual void resetLastOpTimeFromOplog(OperationContext* txn) = 0;
/**
- * Determines if a new sync source should be considered.
- * currentSource: the current sync source
- */
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) = 0;
-
- /**
* Returns the OpTime of the latest replica set-committed op known to this server.
* Committed means a majority of the voting nodes of the config are known to have the
* operation in their oplogs. This implies such ops will never be rolled back.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 049de8be7dc..2492b6b8671 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -149,6 +149,19 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings&
}
return ReplicationCoordinator::modeNone;
}
+
+DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) {
+ DataReplicatorOptions options;
+ options.applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); };
+ options.replicationProgressManager = replCoord;
+ options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastOptime(); };
+ options.setMyLastOptime =
+ [replCoord](const OpTime& opTime) { replCoord->setMyLastOptime(opTime); };
+ options.setFollowerMode =
+ [replCoord](const MemberState& newState) { return replCoord->setFollowerMode(newState); };
+ options.syncSourceSelector = replCoord;
+ return options;
+}
} // namespace
ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
@@ -174,7 +187,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
_sleptLastElection(false),
_canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.slave)),
_canServeNonLocalReads(0U),
- _dr(DataReplicatorOptions(), &_replExecutor, this) {
+ _dr(createDataReplicatorOptions(this), &_replExecutor) {
if (!isReplEnabled()) {
return;
}
diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp
index b05bc2dbdd9..5a09ba48480 100644
--- a/src/mongo/db/repl/reporter.cpp
+++ b/src/mongo/db/repl/reporter.cpp
@@ -38,8 +38,6 @@
namespace mongo {
namespace repl {
-ReplicationProgressManager::~ReplicationProgressManager() {}
-
Reporter::Reporter(ReplicationExecutor* executor,
ReplicationProgressManager* replicationProgressManager,
const HostAndPort& target)
diff --git a/src/mongo/db/repl/reporter.h b/src/mongo/db/repl/reporter.h
index c7c502e8f5f..395f47d81c4 100644
--- a/src/mongo/db/repl/reporter.h
+++ b/src/mongo/db/repl/reporter.h
@@ -38,7 +38,7 @@ namespace repl {
class ReplicationProgressManager {
public:
virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) = 0;
- virtual ~ReplicationProgressManager();
+ virtual ~ReplicationProgressManager() = default;
};
class Reporter {
diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h
new file mode 100644
index 00000000000..846c06c24bf
--- /dev/null
+++ b/src/mongo/db/repl/sync_source_selector.h
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace repl {
+
+
+/**
+ * Manage list of viable and blocked sync sources that we can replicate from.
+ */
+class SyncSourceSelector {
+ MONGO_DISALLOW_COPYING(SyncSourceSelector);
+
+public:
+ SyncSourceSelector() = default;
+ virtual ~SyncSourceSelector() = default;
+
+ /**
+ * Clears the list of sync sources we have blacklisted.
+ */
+ virtual void clearSyncSourceBlacklist() = 0;
+
+ /**
+ * Chooses a viable sync source, or, if none available, returns empty HostAndPort.
+ */
+ virtual HostAndPort chooseNewSyncSource() = 0;
+
+ /**
+ * Blacklists choosing 'host' as a sync source until time 'until'.
+ */
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0;
+
+ /**
+ * Determines if a new sync source should be considered.
+ * currentSource: the current sync source
+ */
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) = 0;
+};
+
+} // namespace repl
+} // namespace mongo