summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2015-06-05 16:30:49 -0400
committerScott Hernandez <scotthernandez@gmail.com>2015-06-05 20:51:01 -0400
commit503731032f19d5fea414442bf891a3a8bfe76759 (patch)
tree6b9301f25844328b31c179b298822ecf8ac1aee8
parent99efb5fc4f771feb8ae81434b4e2d6f7445665e1 (diff)
downloadmongo-503731032f19d5fea414442bf891a3a8bfe76759.tar.gz
SERVER-18039: Add Initial Sync to DataReplicator
-rw-r--r--src/mongo/db/repl/SConscript53
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp10
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.h28
-rw-r--r--src/mongo/db/repl/collection_cloner.h14
-rw-r--r--src/mongo/db/repl/data_replicator.cpp1231
-rw-r--r--src/mongo/db/repl/data_replicator.h147
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp367
-rw-r--r--src/mongo/db/repl/fetcher.cpp6
-rw-r--r--src/mongo/db/repl/fetcher.h2
-rw-r--r--src/mongo/db/repl/fetcher_test.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp1
-rw-r--r--src/mongo/db/repl/replication_executor.cpp8
-rw-r--r--src/mongo/db/repl/replication_executor.h7
-rw-r--r--src/mongo/db/repl/replication_executor_test_fixture.cpp6
-rw-r--r--src/mongo/db/repl/replication_executor_test_fixture.h5
-rw-r--r--src/mongo/db/repl/reporter.cpp9
-rw-r--r--src/mongo/db/repl/reporter.h8
-rw-r--r--src/mongo/db/repl/reporter_test.cpp8
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp4
20 files changed, 1622 insertions, 298 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d041ef49b72..82143ba4bd6 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -72,17 +72,6 @@ env.CppUnitTest(
)
env.Library(
- target='data_replicator',
- source=[
- 'data_replicator.cpp',
- ],
- LIBDEPS=[
- 'fetcher',
- 'repl_coordinator_interface',
- ],
-)
-
-env.Library(
target='oplog_interface_local',
source=[
'oplog_interface_local.cpp',
@@ -263,19 +252,6 @@ env.CppUnitTest('replica_set_config_checks_test',
'replmocks'
])
-env.CppUnitTest(
- target='data_replicator_test',
- source=[
- 'data_replicator_test.cpp',
- ],
- LIBDEPS=[
- 'data_replicator',
- 'replica_set_messages',
- 'replication_executor_test_fixture',
- 'repl_coordinator_test_fixture',
- ],
-)
-
env.CppUnitTest('scatter_gather_test',
'scatter_gather_test.cpp',
LIBDEPS=['repl_coordinator_impl',
@@ -611,6 +587,7 @@ env.Library(
'applier.cpp',
],
LIBDEPS=[
+ 'replication_executor',
],
)
@@ -625,6 +602,34 @@ env.CppUnitTest(
)
env.Library(
+ target='data_replicator',
+ source=[
+ 'data_replicator.cpp',
+ ],
+ LIBDEPS=[
+ 'applier',
+ 'collection_cloner',
+ 'database_cloner',
+ 'fetcher',
+ 'repl_coordinator_interface',
+ ],
+)
+
+env.CppUnitTest(
+ target='data_replicator_test',
+ source=[
+ 'data_replicator_test.cpp',
+ ],
+ LIBDEPS=[
+ 'base_cloner_test_fixture',
+ 'data_replicator',
+ 'replica_set_messages',
+ 'replication_executor_test_fixture',
+ 'repl_coordinator_test_fixture',
+ ],
+)
+
+env.Library(
target='roll_back_local_operations',
source=[
'roll_back_local_operations.cpp',
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index 33357e702e3..8576cf11dc4 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -263,5 +263,15 @@ namespace repl {
return Status::OK();
}
+ Status ClonerStorageInterfaceMock::insertMissingDoc(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) {
+ return Status::OK();
+ }
+
+ Status ClonerStorageInterfaceMock::dropUserDatabases(OperationContext* txn) {
+ return dropUserDatabasesFn ? dropUserDatabasesFn(txn) : Status::OK();
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h
index dca30fe8ae7..22cd7037c60 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.h
+++ b/src/mongo/db/repl/base_cloner_test_fixture.h
@@ -143,6 +143,18 @@ namespace repl {
class ClonerStorageInterfaceMock : public CollectionCloner::StorageInterface {
public:
+ using InsertCollectionFn = stdx::function<Status (OperationContext*,
+ const NamespaceString&,
+ const std::vector<BSONObj>&)>;
+ using BeginCollectionFn = stdx::function<Status (OperationContext*,
+ const NamespaceString&,
+ const CollectionOptions&,
+ const std::vector<BSONObj>&)>;
+ using InsertMissingDocFn = stdx::function<Status (OperationContext*,
+ const NamespaceString&,
+ const BSONObj&)>;
+ using DropUserDatabases = stdx::function<Status (OperationContext*)>;
+
Status beginCollection(OperationContext* txn,
const NamespaceString& nss,
const CollectionOptions& options,
@@ -155,14 +167,16 @@ namespace repl {
Status commitCollection(OperationContext* txn,
const NamespaceString& nss) override;
- stdx::function<Status (OperationContext*,
- const NamespaceString&,
- const CollectionOptions&,
- const std::vector<BSONObj>&)> beginCollectionFn;
+ Status insertMissingDoc(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) override;
+
+ Status dropUserDatabases(OperationContext* txn);
- stdx::function<Status (OperationContext*,
- const NamespaceString&,
- const std::vector<BSONObj>&)> insertDocumentsFn;
+ BeginCollectionFn beginCollectionFn;
+ InsertCollectionFn insertDocumentsFn;
+ InsertMissingDocFn insertMissingDocFn;
+ DropUserDatabases dropUserDatabasesFn;
};
} // namespace repl
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index f4f9d9867ec..5dccb2e21ba 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -244,6 +244,20 @@ namespace repl {
virtual Status commitCollection(OperationContext* txn,
const NamespaceString& nss) = 0;
+ /**
+ * Inserts missing document into a collection (not related to insertDocuments above),
+ * during initial sync retry logic
+ */
+ virtual Status insertMissingDoc(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) = 0;
+
+ /**
+ * Inserts missing document into a collection (not related to insertDocuments above),
+ * during initial sync retry logic
+ */
+ virtual Status dropUserDatabases(OperationContext* txn) = 0;
+
};
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 288f218c19f..aa6b31ce9e8 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -34,136 +34,617 @@
#include <algorithm>
#include <boost/thread.hpp>
+#include <thread>
#include "mongo/base/status.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/collection_cloner.h"
+#include "mongo/db/repl/database_cloner.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
+#include "mongo/util/queue.h"
#include "mongo/util/scopeguard.h"
+#include "mongo/util/stacktrace.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
namespace repl {
-
- using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>;
- using CallbackFn = ReplicationExecutor::CallbackFn;
- using Request = RemoteCommandRequest;
- using Response = RemoteCommandResponse;
- using CommandCallbackData = ReplicationExecutor::RemoteCommandCallbackData;
-// typedef void (*run_func)();
-
// Failpoint for initial sync
MONGO_FP_DECLARE(failInitialSyncWithBadHost);
- namespace {
- const Milliseconds NoSyncSourceRetryDelayMS{4000};
-
- std::string toString(DataReplicatiorState s) {
- switch (s) {
- case DataReplicatiorState::InitialSync:
- return "InitialSync";
- case DataReplicatiorState::Rollback:
- return "Rollback";
- case DataReplicatiorState::Steady:
- return "Steady Replication";
- case DataReplicatiorState::Uninitialized:
- return "Uninitialized";
- default:
- return "<invalid>";
+namespace {
+ // public so tests can remove the sleep.
+ const Milliseconds NoSyncSourceRetryDelayMS{4000};
+ const int InitialSyncRetrySleepSecs{1};
+
+ size_t getSize(const BSONObj& o) {
+ // SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion
+ return static_cast<size_t>(o.objsize());
+ }
+
+ std::string toString(DataReplicatorState s) {
+ switch (s) {
+ case DataReplicatorState::InitialSync:
+ return "InitialSync";
+ case DataReplicatorState::Rollback:
+ return "Rollback";
+ case DataReplicatorState::Steady:
+ return "Steady Replication";
+ case DataReplicatorState::Uninitialized:
+ return "Uninitialized";
+ default:
+ return "<invalid>";
+ }
+ }
+
+ Timestamp findCommonPoint(HostAndPort host, Timestamp start) {
+ // TODO: walk back in the oplog looking for a known/shared optime.
+ return Timestamp();
+ }
+
+ bool _didRollback(HostAndPort host) {
+ // TODO: rollback here, report if done.
+ return false;
+ }
+} // namespace
+
+ /**
+ * Follows the fetcher pattern for a find+getmore
+ */
+ class QueryFetcher {
+ MONGO_DISALLOW_COPYING(QueryFetcher);
+ public:
+ using CallbackFn = stdx::function<void (const BatchDataStatus&, NextAction*)>;
+
+ QueryFetcher(ReplicationExecutor* exec,
+ const HostAndPort& source,
+ const NamespaceString& nss,
+ const BSONObj& cmdBSON,
+ const QueryFetcher::CallbackFn& onBatchAvailable);
+ virtual ~QueryFetcher() = default;
+
+ bool isActive() const { return _fetcher.isActive(); }
+ Status schedule() { return _fetcher.schedule(); }
+ void cancel() { return _fetcher.cancel(); }
+ void wait() { if (_fetcher.isActive()) _fetcher.wait(); }
+ std::string toString() const;
+
+ protected:
+ void _onFetchCallback(const BatchDataStatus& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob);
+
+ virtual void _delegateCallback(const BatchDataStatus& fetchResult,
+ NextAction* nextAction) {
+ _work(fetchResult, nextAction);
+ };
+
+ ReplicationExecutor* _exec;
+ Fetcher _fetcher;
+ int _responses;
+ const QueryFetcher::CallbackFn _work;
+ };
+
+ /**
+ * Follows the fetcher pattern for a find+getmore on an oplog
+ * Returns additional errors if the start oplog entry cannot be found.
+ */
+ class OplogFetcher : public QueryFetcher {
+ MONGO_DISALLOW_COPYING(OplogFetcher);
+ public:
+ OplogFetcher(ReplicationExecutor* exec,
+ const Timestamp& startTS,
+ const HostAndPort& src,
+ const NamespaceString& nss,
+ const QueryFetcher::CallbackFn& work);
+
+ virtual ~OplogFetcher() = default;
+ std::string toString() const;
+
+ protected:
+
+ void _delegateCallback(const BatchDataStatus& fetchResult,
+ NextAction* nextAction);
+
+ const Timestamp _startTS;
+ };
+
+ // QueryFetcher
+ QueryFetcher::QueryFetcher(ReplicationExecutor* exec,
+ const HostAndPort& src,
+ const NamespaceString& nss,
+ const BSONObj& cmdBSON,
+ const CallbackFn& work)
+ : _exec(exec),
+ _fetcher(exec,
+ src,
+ nss.db().toString(),
+ cmdBSON,
+ stdx::bind(&QueryFetcher::_onFetchCallback,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3)),
+ _responses(0),
+ _work(work) {
+ }
+
+ void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ ++_responses;
+
+ _delegateCallback(fetchResult, nextAction);
+ // The fetcher will continue to call with kGetMore until an error or the last batch.
+ if (fetchResult.isOK() && *nextAction == NextAction::kGetMore) {
+ const auto batchData(fetchResult.getValue());
+ invariant(getMoreBob);
+ getMoreBob->append("getMore", batchData.cursorId);
+ getMoreBob->append("collection", batchData.nss.coll());
+ }
+ }
+
+ std::string QueryFetcher::toString() const {
+ return str::stream() << "QueryFetcher -"
+ << " responses: " << _responses
+ << " fetcher: " << _fetcher.getDiagnosticString();
+ }
+
+ // OplogFetcher
+ OplogFetcher::OplogFetcher(ReplicationExecutor* exec,
+ const Timestamp& startTS,
+ const HostAndPort& src,
+ const NamespaceString& oplogNSS,
+ const QueryFetcher::CallbackFn& work)
+ // TODO: add query options await_data, oplog_replay
+ : QueryFetcher(exec,
+ src,
+ oplogNSS,
+ BSON("find" << oplogNSS.coll() <<
+ "query" << BSON("ts" << BSON("$gte" << startTS))),
+ work),
+ _startTS(startTS) {
+ }
+
+ std::string OplogFetcher::toString() const {
+ return str::stream() << "OplogReader -"
+ << " startTS: " << _startTS.toString()
+ << " responses: " << _responses
+ << " fetcher: " << _fetcher.getDiagnosticString();
+ }
+
+ void OplogFetcher::_delegateCallback(const BatchDataStatus& fetchResult,
+ Fetcher::NextAction* nextAction) {
+ invariant(_exec->isRunThread());
+ const bool checkStartTS = _responses == 0;
+
+ if (fetchResult.isOK()) {
+ Fetcher::Documents::const_iterator firstDoc = fetchResult.getValue().documents.begin();
+ auto hasDoc = firstDoc != fetchResult.getValue().documents.end();
+
+ if (checkStartTS &&
+ (!hasDoc || (hasDoc && (*firstDoc)["ts"].timestamp() != _startTS))) {
+ // Set next action to none.
+ *nextAction = Fetcher::NextAction::kNoAction;
+ _work(Status(ErrorCodes::OplogStartMissing,
+ str::stream() << "First returned " << (*firstDoc)["ts"]
+ << " is not where we wanted to start: "
+ << _startTS.toString()),
+ nextAction);
+ return;
+ }
+
+ if (hasDoc) {
+ _work(fetchResult, nextAction);
+ }
+ else {
}
}
+ else {
+ _work(fetchResult, nextAction);
+ }
+ };
+
+ class DatabasesCloner {
+ public:
+ DatabasesCloner(ReplicationExecutor* exec,
+ HostAndPort source,
+ stdx::function<void (const Status&)> finishFn)
+ : _status(ErrorCodes::NotYetInitialized, ""),
+ _exec(exec),
+ _source(source),
+ _active(false),
+ _clonersActive(0),
+ _finishFn(finishFn) {
+ if (!_finishFn) {
+ _status = Status(ErrorCodes::InvalidOptions, "finishFn is not callable.");
+ }
+ };
+
+ Status start();
+
+ bool isActive() {
+ return _active;
+ }
+
+ Status getStatus() {
+ return _status;
+ }
+
+ void cancel() {
+ if (!_active)
+ return;
+ _active = false;
+ // TODO: cancel all cloners
+ _setStatus(Status(ErrorCodes::CallbackCanceled, "Initial Sync Cancelled."));
+ }
+
+ void wait() {
+ // TODO: wait on all cloners
+ }
+
+ std::string toString() {
+ return str::stream() << "initial sync --" <<
+ " active:" << _active <<
+ " status:" << _status.toString() <<
+ " source:" << _source.toString() <<
+ " db cloners active:" << _clonersActive <<
+ " db count:" << _databaseCloners.size();
+ }
+
+
+ // For testing
+ void setStorageInterface(CollectionCloner::StorageInterface* si) {
+ _storage = si;
+ }
+
+ private:
+
+ /**
+ * Does the next action necessary for the initial sync process.
+ *
+ * NOTE: If (!_status.isOK() || !_isActive) then early return.
+ */
+ void _doNextActions();
+
+ /**
+ * Setting the status to not-OK will stop the process
+ */
+ void _setStatus(CBHStatus s) {
+ _setStatus(s.getStatus());
+ }
+
+ /**
+ * Setting the status to not-OK will stop the process
+ */
+ void _setStatus(Status s) {
+ // Only set the first time called, all subsequent failures are not recorded --only first
+ if (_status.code() != ErrorCodes::NotYetInitialized) {
+ _status = s;
+ }
+ }
+
+ /**
+ * Setting the status to not-OK will stop the process
+ */
+ void _setStatus(TimestampStatus s) {
+ _setStatus(s.getStatus());
+ }
+
+ void _failed();
+
+ /** Called each time a database clone is finished */
+ void _onEachDBCloneFinish(const Status& status, const std::string name);
+
+ // Callbacks
+
+ void _onListDatabaseFinish(const CommandCallbackData& cbd);
+
+
+ // Member variables
+ Status _status; // If it is not OK, we stop everything.
+ ReplicationExecutor* _exec; // executor to schedule things with
+ HostAndPort _source; // The source to use, until we get an error
+ bool _active; // false until we start
+ std::vector<std::shared_ptr<DatabaseCloner>> _databaseCloners; // database cloners by name
+ int _clonersActive;
+
+ const stdx::function<void (const Status&)> _finishFn;
+
+ CollectionCloner::StorageInterface* _storage;
+ };
+
+ /** State held during Initial Sync */
+ struct InitialSyncState {
+ InitialSyncState(DatabasesCloner cloner, Event event)
+ : dbsCloner(cloner), finishEvent(event), status(ErrorCodes::IllegalOperation, "") {};
+
+ DatabasesCloner dbsCloner; // Cloner for all databases included in initial sync.
+ Timestamp beginTimestamp; // Timestamp from the latest entry in oplog when started.
+ Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states.
+ Event finishEvent; // event fired on completion, either successful or not.
+ Status status; // final status, only valid after the finishEvent fires.
+ size_t fetchedMissingDocs;
+ size_t appliedOps;
+
+ // Temporary fetch for things like fetching remote optime, or tail
+ std::unique_ptr<Fetcher> tmpFetcher;
+ TimestampStatus getLatestOplogTimestamp(ReplicationExecutor* exec,
+ HostAndPort source,
+ const NamespaceString& oplogNS);
+ void setStatus(const Status& s);
+ void setStatus(const CBHStatus& s);
+ void _setTimestampSatus(const BatchDataStatus& fetchResult,
+ Fetcher::NextAction* nextAction,
+ TimestampStatus* status) ;
+ };
+
+ // Initial Sync state
+ TimestampStatus InitialSyncState::getLatestOplogTimestamp(ReplicationExecutor* exec,
+ HostAndPort source,
+ const NamespaceString& oplogNS) {
+
+ BSONObj query = BSON("find" << oplogNS.coll() <<
+ "sort" << BSON ("$natural" << -1) <<
+ "limit" << 1);
+
+ TimestampStatus timestampStatus(ErrorCodes::BadValue, "");
+ Fetcher f(exec,
+ source,
+ oplogNS.db().toString(),
+ query,
+ stdx::bind(&InitialSyncState::_setTimestampSatus, this, stdx::placeholders::_1,
+ stdx::placeholders::_2, &timestampStatus));
+ Status s = f.schedule();
+ if (!s.isOK()) {
+ return TimestampStatus(s);
+ }
- /*
- Status callViaExecutor(ReplicationExecutor* exec, const CallbackFn& work) {
- CBHStatus cbh = exec->scheduleWork(work);
- if (!cbh.getStatus().isOK()) {
- return cbh.getStatus();
+ // wait for fetcher to get the oplog position.
+ f.wait();
+ return timestampStatus;
+ }
+
+ void InitialSyncState::_setTimestampSatus(const BatchDataStatus& fetchResult,
+ Fetcher::NextAction* nextAction,
+ TimestampStatus* status) {
+ if (!fetchResult.isOK()) {
+ *status = TimestampStatus(fetchResult.getStatus());
+ } else {
+ // TODO: Set _beginTimestamp from first doc "ts" field.
+ const auto docs = fetchResult.getValue().documents;
+ const auto hasDoc = docs.begin() != docs.end();
+ if (!hasDoc || !docs.begin()->hasField("ts")) {
+ *status = TimestampStatus(ErrorCodes::FailedToParse,
+ "Could not find an oplog entry with 'ts' field.");
+ } else {
+ *status = TimestampStatus(docs.begin()->getField("ts").timestamp());
}
+ }
+ }
- exec->wait(cbh.getValue());
+ void InitialSyncState::setStatus(const Status& s) {
+ status = s;
+ }
+ void InitialSyncState::setStatus(const CBHStatus& s) {
+ setStatus(s.getStatus());
+ }
- return Status::OK();
+ // Initial Sync
+ Status DatabasesCloner::start() {
+ _active = true;
+
+ if (!_status.isOK() && _status.code() != ErrorCodes::NotYetInitialized) {
+ return _status;
}
- */
- Timestamp findCommonPoint(HostAndPort host, Timestamp start) {
- // TODO: walk back in the oplog looking for a known/shared optime.
- return Timestamp();
+ _status = Status::OK();
+
+ log() << "starting cloning of all databases";
+ // Schedule listDatabase command which will kick off the database cloner per result db.
+ Request listDBsReq(_source, "admin", BSON("listDatabases" << true));
+ CBHStatus s = _exec->scheduleRemoteCommand(
+ listDBsReq,
+ stdx::bind(&DatabasesCloner::_onListDatabaseFinish,
+ this,
+ stdx::placeholders::_1));
+ if (!s.isOK()) {
+ _setStatus(s);
+ _failed();
}
- bool _didRollback(HostAndPort host) {
- // TODO: rollback here, report if done.
- return false;
+ _doNextActions();
+
+ return _status;
+ }
+
+ void DatabasesCloner::_onListDatabaseFinish(const CommandCallbackData& cbd) {
+ invariant(_exec->isRunThread());
+ const Status respStatus = cbd.response.getStatus();
+ if (!respStatus.isOK()) {
+ // TODO: retry internally?
+ _setStatus(respStatus);
+ _doNextActions();
+ return;
}
- } // namespace
- Status InitialSyncImpl::start() {
- // For testing, we may want to fail if we receive a getmore.
- if (MONGO_FAIL_POINT(failInitialSyncWithBadHost)) {
- _status = StatusWith<Timestamp>(ErrorCodes::InvalidSyncSource, "no sync source avail.");
+ const auto respBSON = cbd.response.getValue().data;
+
+ // There should not be any cloners yet
+ invariant(_databaseCloners.size() == 0);
+
+ const auto okElem = respBSON["ok"];
+ if (okElem.trueValue()) {
+ const auto dbsElem = respBSON["databases"].Obj();
+ BSONForEach(arrayElement, dbsElem) {
+ const BSONObj dbBSON = arrayElement.Obj();
+ const std::string name = dbBSON["name"].str();
+ ++_clonersActive;
+ std::shared_ptr<DatabaseCloner> dbCloner{nullptr};
+ try {
+ dbCloner.reset(new DatabaseCloner(
+ _exec,
+ _source,
+ name,
+ BSONObj(), // do not filter database out.
+ [](const BSONObj&) { return true; }, // clone all dbs.
+ _storage, // use storage provided.
+ [](const Status& status, const NamespaceString& srcNss) {
+ if (status.isOK()) {
+ log() << "collection clone finished: " << srcNss;
+ }
+ else {
+ log() << "collection clone for '"
+ << srcNss << "' failed due to "
+ << status.toString();
+ }
+ },
+ [=](const Status& status) {
+ _onEachDBCloneFinish(status, name);
+ }));
+ }
+ catch (...) {
+ // error creating, fails below.
+ }
+
+ Status s = dbCloner ? dbCloner->start() : Status(ErrorCodes::UnknownError, "Bad!");
+
+ if (!s.isOK()) {
+ std::string err = str::stream() << "could not create cloner for database: "
+ << name << " due to: " << s.toString();
+ _setStatus(Status(ErrorCodes::InitialSyncFailure, err));
+ error() << err;
+ break; // exit for_each loop
+ }
+
+ // add cloner to list.
+ _databaseCloners.push_back(dbCloner);
+ }
}
else {
- _status = Status::OK();
+ _setStatus(Status(ErrorCodes::InitialSyncFailure,
+ "failed to clone databases due to failed server response."));
}
- return _status.getStatus();
+
+ // Move on to the next steps in the process.
+ _doNextActions();
}
+ void DatabasesCloner::_onEachDBCloneFinish(const Status& status, const std::string name) {
+ auto clonersLeft = --_clonersActive;
+
+ if (status.isOK()) {
+ log() << "database clone finished: " << name;
+ }
+ else {
+ log() << "database clone failed due to "
+ << status.toString();
+ _setStatus(status);
+ }
+
+ if (clonersLeft == 0) {
+ _active = false;
+ // All cloners are done, trigger event.
+ log() << "all database clones finished, calling _finishFn";
+ _finishFn(_status);
+ }
+
+ _doNextActions();
+ }
+
+ void DatabasesCloner::_doNextActions() {
+ // If we are no longer active or we had an error, stop doing more
+ if (!(_active && _status.isOK())) {
+ if (!_status.isOK()) {
+ // trigger failed state
+ _failed();
+ }
+ return;
+ }
+ }
+
+ void DatabasesCloner::_failed() {
+ // TODO: cancel outstanding work, like any cloners active
+ invariant(_finishFn);
+ _finishFn(_status);
+ }
+
+ // Data Replicator
DataReplicator::DataReplicator(DataReplicatorOptions opts,
ReplicationExecutor* exec,
ReplicationCoordinator* replCoord)
: _opts(opts),
_exec(exec),
_replCoord(replCoord),
- _state(DataReplicatiorState::Uninitialized) {
-
+ _state(DataReplicatorState::Uninitialized),
+ _fetcherPaused(false),
+ _reporterPaused(false),
+ _applierActive(false),
+ _applierPaused(false),
+ _oplogBuffer(256*1024*1024, &getSize), // Limit buffer to 256MB
+ _doShutdown(false) {
+ // TODO: replace this with a method in the replication coordinator.
+ if (replCoord) {
+ _batchCompletedFn = [&] (const Timestamp& ts) {
+ OpTime ot(ts, 0);
+ _replCoord->setMyLastOptime(ot);
+ };
+ }
+ else {
+ _batchCompletedFn = [] (const Timestamp& ts) {
+ };
+ }
}
DataReplicator::DataReplicator(DataReplicatorOptions opts,
ReplicationExecutor* exec)
- : _opts(opts),
- _exec(exec),
- _state(DataReplicatiorState::Uninitialized) {
+ : DataReplicator(opts, exec, nullptr) {
}
-/*
- Status DataReplicator::_run(run_func what) {
- return callViaExecutor(_exec,
- stdx::bind(what,
- this,
- stdx::placeholders::_1));
+
+ DataReplicator::~DataReplicator() {
+ DESTRUCTOR_GUARD(
+ _cancelAllHandles_inlock();
+ _waitOnAll_inlock();
+ );
}
-*/
+
Status DataReplicator::start() {
- boost::unique_lock<boost::mutex> lk(_mutex);
- if (_state != DataReplicatiorState::Uninitialized) {
- // Error.
+ UniqueLock lk(_mutex);
+ if (_state != DataReplicatorState::Uninitialized) {
+ return Status(ErrorCodes::IllegalOperation,
+ str::stream() << "Already started in another state: "
+ << toString(_state));
}
- _state = DataReplicatiorState::Steady;
+
+ _state = DataReplicatorState::Steady;
if (_replCoord) {
- // TODO: Use chooseNewSyncSource? -this requires an active replCoord so not working in tests
- _fetcher.reset(new Fetcher(_exec,
- HostAndPort(), //_replCoord->chooseNewSyncSource(),
- _opts.remoteOplogNS.db().toString(),
- BSON("find" << _opts.remoteOplogNS),
- stdx::bind(&DataReplicator::_onFetchFinish,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2)));
+ const auto lastOptime = _replCoord->getMyLastOptime();
+ _fetcher.reset(new OplogFetcher(_exec,
+ lastOptime.getTimestamp(),
+ HostAndPort(), //TODO _replCoord->chooseNewSyncSource(),
+ _opts.remoteOplogNS,
+ stdx::bind(&DataReplicator::_onOplogFetchFinish,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2)));
}
else {
- _fetcher.reset(new Fetcher(_exec,
- _opts.syncSource,
- _opts.remoteOplogNS.db().toString(),
- BSON("find" << _opts.remoteOplogNS),
- stdx::bind(&DataReplicator::_onFetchFinish,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2)));
+ // TODO: add query options await_data, oplog_replay
+ _fetcher.reset(new OplogFetcher(_exec,
+ _opts.startOptime,
+ _opts.syncSource,
+ _opts.remoteOplogNS,
+ stdx::bind(&DataReplicator::_onOplogFetchFinish,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2)));
}
@@ -177,14 +658,39 @@ namespace repl {
}
Status DataReplicator::pause() {
- // TODO
+ _pauseApplier();
return Status::OK();
}
+ std::string DataReplicator::getDiagnosticString() const {
+ str::stream out;
+ out << "DataReplicator -"
+ << " opts: " << _opts.toString()
+ << " oplogFetcher: " << _fetcher->toString()
+ << " opsBuffered: " << _oplogBuffer.size()
+ << " state: " << toString(_state);
+ switch (_state) {
+ case DataReplicatorState::InitialSync:
+ out << " opsAppied: " << _initialSyncState->appliedOps
+ << " status: " << _initialSyncState->status.toString();
+ break;
+ case DataReplicatorState::Steady:
+ // TODO: add more here
+ break;
+ case DataReplicatorState::Rollback:
+ // TODO: add more here
+ break;
+ default:
+ break;
+ }
+
+ return out;
+ }
+
Status DataReplicator::resume(bool wait) {
- StatusWith<Handle> handle = _exec->scheduleWork(stdx::bind(&DataReplicator::_resumeFinish,
- this,
- stdx::placeholders::_1));
+ CBHStatus handle = _exec->scheduleWork(stdx::bind(&DataReplicator::_resumeFinish,
+ this,
+ stdx::placeholders::_1));
const Status status = handle.getStatus();
if (wait && status.isOK()) {
_exec->wait(handle.getValue());
@@ -193,12 +699,19 @@ namespace repl {
}
void DataReplicator::_resumeFinish(CallbackData cbData) {
+ UniqueLock lk(_mutex);
_fetcherPaused = _applierPaused = false;
+ lk.unlock();
+
_doNextActions();
}
void DataReplicator::_pauseApplier() {
+ LockGuard lk(_mutex);
+ if (_applier)
+ _applier->wait();
_applierPaused = true;
+ _applier.reset();
}
Timestamp DataReplicator::_applyUntil(Timestamp untilTimestamp) {
@@ -211,21 +724,23 @@ namespace repl {
return _applyUntil(untilTimestamp);
}
- StatusWith<Timestamp> DataReplicator::flushAndPause() {
+ TimestampStatus DataReplicator::flushAndPause() {
//_run(&_pauseApplier);
- boost::unique_lock<boost::mutex> lk(_mutex);
+ UniqueLock lk(_mutex);
if (_applierActive) {
+ _applierPaused = true;
lk.unlock();
- _exec->wait(_applierHandle);
+ _applier->wait();
lk.lock();
}
- return StatusWith<Timestamp>(_lastOptimeApplied);
+ return TimestampStatus(_lastTimestampApplied);
}
- void DataReplicator::_resetState(Timestamp lastAppliedOptime) {
- boost::lock_guard<boost::mutex> lk(_mutex);
- _lastOptimeApplied = _lastOptimeFetched = lastAppliedOptime;
- }
+ void DataReplicator::_resetState_inlock(Timestamp lastAppliedOptime) {
+ invariant(!_anyActiveHandles_inlock());
+ _lastTimestampApplied = _lastTimestampFetched = lastAppliedOptime;
+ _oplogBuffer.clear();
+ }
void DataReplicator::slavesHaveProgressed() {
if (_reporter) {
@@ -233,53 +748,134 @@ namespace repl {
}
}
- StatusWith<Timestamp> DataReplicator::resync() {
+ void DataReplicator::_setInitialSyncStorageInterface(CollectionCloner::StorageInterface* si) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _storage = si;
+ if (_initialSyncState) {
+ _initialSyncState->dbsCloner.setStorageInterface(_storage);
+ }
+ }
+
+ TimestampStatus DataReplicator::resync() {
_shutdown();
// Drop databases and do initialSync();
- // TODO drop database
- StatusWith<Timestamp> status = initialSync();
+ CBHStatus cbh = _exec->scheduleDBWork([&](const CallbackData& cbData) {
+ _storage->dropUserDatabases(cbData.txn);
+ });
+
+ if (!cbh.isOK()) {
+ return TimestampStatus(cbh.getStatus());
+ }
+
+ _exec->wait(cbh.getValue());
+
+ TimestampStatus status = initialSync();
if (status.isOK()) {
- _resetState(status.getValue());
+ _resetState_inlock(status.getValue());
}
return status;
}
- StatusWith<Timestamp> DataReplicator::initialSync() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- if (_state != DataReplicatiorState::Uninitialized) {
- if (_state == DataReplicatiorState::InitialSync)
- return StatusWith<Timestamp>(
- ErrorCodes::InvalidRoleModification,
- (str::stream() << "Already doing initial sync;try resync"));
+ TimestampStatus DataReplicator::initialSync() {
+ Timer t;
+ UniqueLock lk(_mutex);
+ if (_state != DataReplicatorState::Uninitialized) {
+ if (_state == DataReplicatorState::InitialSync)
+ return TimestampStatus(ErrorCodes::InvalidRoleModification,
+ (str::stream() << "Already doing initial sync;try resync"));
else {
- return StatusWith<Timestamp>(
- ErrorCodes::AlreadyInitialized,
- (str::stream() << "Cannot do initial sync in "
- << toString(_state)));
+ return TimestampStatus(ErrorCodes::AlreadyInitialized,
+ (str::stream() << "Cannot do initial sync in "
+ << toString(_state) << " state."));
}
}
- _state = DataReplicatiorState::InitialSync;
+ _state = DataReplicatorState::InitialSync;
+
+ // The reporter is paused for the duration of the initial sync, so cancel just in case.
+ if (_reporter) {
+ _reporter->cancel();
+ }
+ _reporterPaused = true;
+ _applierPaused = true;
+
+ // TODO: set minvalid doc initial sync state.
+
const int maxFailedAttempts = 10;
int failedAttempts = 0;
+ Status attemptErrorStatus(Status::OK());
while (failedAttempts < maxFailedAttempts) {
- _initialSync.reset(new InitialSyncImpl(_exec));
- _initialSync->start();
- _initialSync->wait();
- Status s = _initialSync->getStatus().getStatus();
+ // For testing, we may want to fail if we receive a getmore.
+ if (MONGO_FAIL_POINT(failInitialSyncWithBadHost)) {
+ attemptErrorStatus = Status(ErrorCodes::InvalidSyncSource, "no sync source avail.");
+ }
- if (s.isOK()) {
- // we are done :)
- break;
+ Event initialSyncFinishEvent;
+ if (attemptErrorStatus.isOK() && _syncSource.empty()) {
+ _syncSource = _replCoord->chooseNewSyncSource();
+ }
+ else if(attemptErrorStatus.isOK()) {
+ StatusWith<Event> status = _exec->makeEvent();
+ if (!status.isOK()) {
+ attemptErrorStatus = status.getStatus();
+ } else {
+ initialSyncFinishEvent = status.getValue();
+ }
+ }
+
+ if (attemptErrorStatus.isOK()) {
+ invariant(initialSyncFinishEvent.isValid());
+ _initialSyncState.reset(new InitialSyncState(
+ DatabasesCloner(_exec,
+ _syncSource,
+ stdx::bind(&DataReplicator::_onDataClonerFinish,
+ this,
+ stdx::placeholders::_1)),
+ initialSyncFinishEvent));
+
+ _initialSyncState->dbsCloner.setStorageInterface(_storage);
+ const NamespaceString ns(_opts.remoteOplogNS);
+ TimestampStatus tsStatus = _initialSyncState->getLatestOplogTimestamp(
+ _exec,
+ _syncSource,
+ ns);
+ attemptErrorStatus = tsStatus.getStatus();
+ if (attemptErrorStatus.isOK()) {
+ _initialSyncState->beginTimestamp = tsStatus.getValue();
+ _fetcher.reset(new OplogFetcher(_exec,
+ _initialSyncState->beginTimestamp,
+ _syncSource,
+ _opts.remoteOplogNS,
+ stdx::bind(&DataReplicator::_onOplogFetchFinish,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2)));
+ _scheduleFetch_inlock();
+ lk.unlock();
+ _initialSyncState->dbsCloner.start(); // When the cloner is done applier starts.
+ invariant(_initialSyncState->finishEvent.isValid());
+ _exec->waitForEvent(_initialSyncState->finishEvent);
+ attemptErrorStatus = _initialSyncState->status;
+
+ // Re-lock DataReplicator Internals
+ lk.lock();
+ }
+ }
+
+ if (attemptErrorStatus.isOK()) {
+ break; // success
}
++failedAttempts;
error() << "Initial sync attempt failed -- attempts left: "
<< (maxFailedAttempts - failedAttempts) << " cause: "
- << s;
- // TODO: uncomment
- //sleepsecs(5);
+ << attemptErrorStatus;
+
+ // Sleep for retry time
+ lk.unlock();
+ sleepsecs(InitialSyncRetrySleepSecs);
+ lk.lock();
// No need to print a stack
if (failedAttempts >= maxFailedAttempts) {
@@ -289,19 +885,105 @@ namespace repl {
return Status(ErrorCodes::InitialSyncFailure, err);
}
}
- return _initialSync->getStatus();
+
+ // Success, cleanup
+ // TODO: re-enable, fetcher is blocking on wait (in _waitOnAll)
+ _cancelAllHandles_inlock();
+ _waitOnAll_inlock();
+
+ _reporterPaused = false;
+ _fetcherPaused = false;
+ _fetcher.reset(nullptr);
+ _tmpFetcher.reset(nullptr);
+ _applierPaused = false;
+ _applier.reset(nullptr);
+ _applierActive = false;
+ _initialSyncState.reset(nullptr);
+ _oplogBuffer.clear();
+ _resetState_inlock(_lastTimestampApplied);
+
+ log() << "Initial sync took: " << t.millis() << " milliseconds.";
+ return TimestampStatus(_lastTimestampApplied);
}
- const bool DataReplicator::_anyActiveHandles() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- return _fetcher->isActive() || _applierActive || _reporter->isActive();
+ void DataReplicator::_onDataClonerFinish(const Status& status) {
+ log() << "data clone finished, status: " << status.toString();
+ if (!status.isOK()) {
+ // Iniitial sync failed during cloning of databases
+ _initialSyncState->setStatus(status);
+ _exec->signalEvent(_initialSyncState->finishEvent);
+ return;
+ }
+
+ BSONObj query = BSON("find" << _opts.remoteOplogNS.coll() <<
+ "sort" << BSON ("$natural" << -1) <<
+ "limit" << 1);
+
+ TimestampStatus timestampStatus(ErrorCodes::BadValue, "");
+ _tmpFetcher.reset(new QueryFetcher(_exec,
+ _syncSource,
+ _opts.remoteOplogNS,
+ query,
+ stdx::bind(&DataReplicator::_onApplierReadyStart,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2)));
+ Status s = _tmpFetcher->schedule();
+ if (!s.isOK()) {
+ _initialSyncState->setStatus(s);
+ }
+ }
+
+ void DataReplicator::_onApplierReadyStart(const BatchDataStatus& fetchResult,
+ NextAction* nextAction) {
+ invariant(_exec->isRunThread());
+ // Data clone done, move onto apply.
+ TimestampStatus ts(ErrorCodes::OplogStartMissing, "");
+ _initialSyncState->_setTimestampSatus(fetchResult, nextAction, &ts);
+ if (ts.isOK()) {
+ // TODO: set minvalid?
+ LockGuard lk(_mutex);
+ _initialSyncState->stopTimestamp = ts.getValue();
+ if (_lastTimestampApplied < ts.getValue()) {
+ log() << "waiting for applier to run until ts: " << ts.getValue();
+ }
+ invariant(_applierPaused);
+ _applierPaused = false;
+ _doNextActions_InitialSync_inlock();
+ }
+ else {
+ _initialSyncState->setStatus(ts.getStatus());
+ _doNextActions();
+ }
}
- const void DataReplicator::_cancelAllHandles() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- _fetcher->cancel();
- _exec->cancel(_applierHandle);
- _reporter->cancel();
+ bool DataReplicator::_anyActiveHandles_inlock() const {
+ return _applierActive ||
+ (_fetcher && _fetcher->isActive()) ||
+ (_initialSyncState && _initialSyncState->dbsCloner.isActive()) ||
+ (_reporter && _reporter->isActive());
+ }
+
+ void DataReplicator::_cancelAllHandles_inlock() {
+ if (_fetcher)
+ _fetcher->cancel();
+ if (_applier)
+ _applier->cancel();
+ if (_reporter)
+ _reporter->cancel();
+ if (_initialSyncState && _initialSyncState->dbsCloner.isActive())
+ _initialSyncState->dbsCloner.cancel();
+ }
+
+ void DataReplicator::_waitOnAll_inlock() {
+ if (_fetcher)
+ _fetcher->wait();
+ if (_applier)
+ _applier->wait();
+ if (_reporter)
+ _reporter->wait();
+ if (_initialSyncState)
+ _initialSyncState->dbsCloner.wait();
}
void DataReplicator::_doNextActionsCB(CallbackData cbData) {
@@ -315,9 +997,9 @@ namespace repl {
// 3.) Steady (Replication)
// Check for shutdown flag, signal event
- boost::lock_guard<boost::mutex> lk(_mutex);
+ LockGuard lk(_mutex);
if (_doShutdown) {
- if(!_anyActiveHandles()) {
+ if(!_anyActiveHandles_inlock()) {
_exec->signalEvent(_onShutdown);
}
return;
@@ -325,13 +1007,13 @@ namespace repl {
// Do work for the current state
switch (_state) {
- case DataReplicatiorState::Rollback:
+ case DataReplicatorState::Rollback:
_doNextActions_Rollback_inlock();
break;
- case DataReplicatiorState::InitialSync:
+ case DataReplicatorState::InitialSync:
_doNextActions_InitialSync_inlock();
break;
- case DataReplicatiorState::Steady:
+ case DataReplicatorState::Steady:
_doNextActions_Steady_inlock();
break;
default:
@@ -343,21 +1025,32 @@ namespace repl {
}
void DataReplicator::_doNextActions_InitialSync_inlock() {
- // TODO: check initial sync state and do next actions
- // move from initial sync phase to initial sync phase via scheduled work in exec
-
- if (!_initialSync) {
- // Error case?, reset to uninit'd
- _state = DataReplicatiorState::Uninitialized;
+ if (!_initialSyncState) {
+ // TODO: Error case?, reset to uninit'd
+ _state = DataReplicatorState::Uninitialized;
+ log() << "_initialSyncState, so resetting state to Uninitialized";
return;
}
- if (!_initialSync->isActive()) {
- if (!_initialSync->getStatus().isOK()) {
+ if (!_initialSyncState->dbsCloner.isActive()) {
+ if (!_initialSyncState->dbsCloner.getStatus().isOK()) {
// TODO: Initial sync failed
}
else {
- // TODO: success
+ if (!_lastTimestampApplied.isNull() &&
+ _lastTimestampApplied >= _initialSyncState->stopTimestamp) {
+ invariant(_initialSyncState->finishEvent.isValid());
+ log() << "Applier done, initial sync done, end timestamp: "
+ << _initialSyncState->stopTimestamp << " , last applier: "
+ << _lastTimestampApplied;
+ _state = DataReplicatorState::Uninitialized;
+ _initialSyncState->setStatus(Status::OK());
+ _exec->signalEvent(_initialSyncState->finishEvent);
+ }
+ else {
+ // Run steady state events to fetch/apply.
+ _doNextActions_Steady_inlock();
+ }
}
}
}
@@ -383,48 +1076,189 @@ namespace repl {
} else {
// Check if active fetch, if not start one
if (!_fetcher->isActive()) {
- _scheduleFetch();
+ _scheduleFetch_inlock();
}
}
- // Check if active apply, if not start one
- if (!_applierActive) {
- _scheduleApplyBatch();
+ // Check if no active apply and ops to apply
+ if (!_applierActive && _oplogBuffer.size()) {
+ _scheduleApplyBatch_inlock();
}
- if (!_reporter || !_reporter->previousReturnStatus().isOK()) {
+ if (!_reporterPaused && (!_reporter || !_reporter->getStatus().isOK())) {
// TODO get reporter in good shape
_reporter.reset(new Reporter(_exec, _replCoord, HostAndPort()));
}
}
- void DataReplicator::_onApplyBatchFinish(CallbackData cbData) {
- _reporter->trigger();
- // TODO
+ Operations DataReplicator::_getNextApplierBatch_inlock() {
+ // Return a new batch of ops to apply.
+ // TODO: limit the batch like SyncTail::tryPopAndWaitForMore
+ Operations ops;
+ BSONObj op;
+ while(_oplogBuffer.tryPop(op)) {
+ ops.push_back(op);
+ }
+ return ops;
+ }
+
+ void DataReplicator::_onApplyBatchFinish(const CallbackData& cbData,
+ const TimestampStatus& ts,
+ const Operations& ops,
+ const size_t numApplied) {
+ invariant(cbData.status.isOK());
+ UniqueLock lk(_mutex);
+ _initialSyncState->appliedOps += numApplied;
+ if (!ts.isOK()) {
+ _handleFailedApplyBatch(ts, ops);
+ return;
+ }
+
+ _applierActive = false;
+ _lastTimestampApplied = ts.getValue();
+ lk.unlock();
+
+ if (_batchCompletedFn) {
+ _batchCompletedFn(ts.getValue());
+ }
+ // TODO: move the reported to the replication coordinator and set _batchCompletedFn to a
+ // function in the replCoord.
+ if (_reporter) {
+ _reporter->trigger();
+ }
+
+ _doNextActions();
+ }
+
+ void DataReplicator::_handleFailedApplyBatch(const TimestampStatus& ts, const Operations& ops) {
+ switch (_state) {
+ case DataReplicatorState::InitialSync:
+ // TODO: fetch missing doc, and retry.
+ _scheduleApplyAfterFetch(ops);
+ break;
+ case DataReplicatorState::Rollback:
+ // TODO: nothing?
+ default:
+ // fatal
+ fassert(28666, ts.getStatus());
+ }
+ }
+
+ void DataReplicator::_scheduleApplyAfterFetch(const Operations& ops) {
+ ++_initialSyncState->fetchedMissingDocs;
+ // TODO: check collection.isCapped, like SyncTail::getMissingDoc
+ const BSONObj failedOplogEntry = *ops.begin();
+ const BSONElement missingIdElem = failedOplogEntry.getFieldDotted("o2._id");
+ const NamespaceString nss(ops.begin()->getField("ns").str());
+ const BSONObj query = BSON("find" << nss.coll() << "query" << missingIdElem.wrap());
+ _tmpFetcher.reset(new QueryFetcher(_exec, _syncSource, nss, query,
+ stdx::bind(&DataReplicator::_onMissingFetched,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ ops,
+ nss)));
+ Status s = _tmpFetcher->schedule();
+ if (!s.isOK()) {
+ // record error and take next step based on it.
+ _initialSyncState->setStatus(s);
+ _doNextActions();
+ }
+ }
+
+ void DataReplicator::_onMissingFetched(const BatchDataStatus& fetchResult,
+ Fetcher::NextAction* nextAction,
+ const Operations& ops,
+ const NamespaceString nss) {
+ invariant(_exec->isRunThread());
+
+ if (!fetchResult.isOK()) {
+ // TODO: do retries on network issues, like SyncTail::getMissingDoc
+ _initialSyncState->setStatus(fetchResult.getStatus());
+ _doNextActions();
+ return;
+ } else if (!fetchResult.getValue().documents.size()) {
+ // TODO: skip apply for this doc, like multiInitialSyncApply?
+ _initialSyncState->setStatus(Status(ErrorCodes::InitialSyncFailure,
+ "missing doc not found"));
+ _doNextActions();
+ return;
+ }
+
+ const BSONObj missingDoc = *fetchResult.getValue().documents.begin();
+ Status rs{Status::OK()};
+ auto s = _exec->scheduleDBWork(([&](const CallbackData& cd) {
+ rs = _storage->insertMissingDoc(cd.txn, nss, missingDoc);
+ }),
+ nss,
+ MODE_IX);
+ if (!s.isOK()) {
+ _initialSyncState->setStatus(s);
+ _doNextActions();
+ return;
+ }
+
+ _exec->wait(s.getValue());
+ if (!rs.isOK()) {
+ _initialSyncState->setStatus(rs);
+ _doNextActions();
+ return;
+ }
+
+ auto status = _scheduleApplyBatch_inlock(ops);
+ if (!status.isOK()) {
+ LockGuard lk(_mutex);
+ _initialSyncState->setStatus(status);
+ _exec->signalEvent(_initialSyncState->finishEvent);
+ }
}
Status DataReplicator::_scheduleApplyBatch() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- if (!_applierActive) {
- // TODO
- _applierActive = true;
- auto status = _exec->scheduleWork(
- stdx::bind(&DataReplicator::_onApplyBatchFinish,
- this,
- stdx::placeholders::_1));
- if (!status.isOK()) {
- return status.getStatus();
- }
+ LockGuard lk(_mutex);
+ return _scheduleApplyBatch_inlock();
+ }
- _applierHandle = status.getValue();
+ Status DataReplicator::_scheduleApplyBatch_inlock() {
+ if (!_applierPaused && !_applierActive) {
+ _applierActive = true;
+ const Operations ops = _getNextApplierBatch_inlock();
+ invariant(ops.size());
+ invariant(_opts.applierFn);
+ invariant(!(_applier && _applier->isActive()));
+ return _scheduleApplyBatch_inlock(ops);
}
return Status::OK();
}
+ Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) {
+ auto lambda = [&] (const TimestampStatus& ts, const Operations& ops) {
+ CBHStatus status = _exec->scheduleWork(stdx::bind(&DataReplicator::_onApplyBatchFinish,
+ this,
+ stdx::placeholders::_1,
+ ts,
+ ops,
+ ops.size()));
+ if (!status.isOK()) {
+ LockGuard lk(_mutex);
+ _initialSyncState->setStatus(status);
+ _exec->signalEvent(_initialSyncState->finishEvent);
+ return;
+ }
+ // block until callback done.
+ _exec->wait(status.getValue());
+ };
+
+ _applier.reset(new Applier(_exec, ops, _opts.applierFn, lambda));
+ return _applier->start();
+ }
+
Status DataReplicator::_scheduleFetch() {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ LockGuard lk(_mutex);
+ return _scheduleFetch_inlock();
+ }
+
+ Status DataReplicator::_scheduleFetch_inlock() {
if (!_fetcher->isActive()) {
- // TODO
Status status = _fetcher->schedule();
if (!status.isOK()) {
return status;
@@ -445,13 +1279,9 @@ namespace repl {
Status DataReplicator::_shutdown() {
StatusWith<Event> eventStatus = _exec->makeEvent();
if (!eventStatus.isOK()) return eventStatus.getStatus();
- boost::unique_lock<boost::mutex> lk(_mutex);
+ UniqueLock lk(_mutex);
_onShutdown = eventStatus.getValue();
- lk.unlock();
-
- _cancelAllHandles();
-
- lk.lock();
+ _cancelAllHandles_inlock();
_doShutdown = true;
lk.unlock();
@@ -481,38 +1311,64 @@ namespace repl {
}
}
- void DataReplicator::_onFetchFinish(const StatusWith<Fetcher::BatchData>& fetchResult,
- Fetcher::NextAction* nextAction) {
+ void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::BatchData>& fetchResult,
+ Fetcher::NextAction* nextAction) {
+ invariant(_exec->isRunThread());
const Status status = fetchResult.getStatus();
if (status.code() == ErrorCodes::CallbackCanceled)
return;
if (status.isOK()) {
const auto docs = fetchResult.getValue().documents;
- _oplogBuffer.insert(_oplogBuffer.end(), docs.begin(), docs.end());
- if (*nextAction == Fetcher::NextAction::kNoAction) {
- // TODO: create new fetcher?, with new query from where we left off
+ if (docs.begin() != docs.end()) {
+ LockGuard lk(_mutex);
+ std::for_each(docs.cbegin(),
+ docs.cend(),
+ [&](const BSONObj& doc) {
+ _oplogBuffer.push(doc);
+ });
+ auto doc = docs.rbegin();
+ BSONElement tsElem(doc->getField("ts"));
+ while(tsElem.eoo() || doc != docs.rend()) {
+ tsElem = (doc++)->getField("ts");
+ }
+
+ if (!tsElem.eoo()) {
+ _lastTimestampFetched = tsElem.timestamp();
+ } else {
+ warning() <<
+ "Did not find a 'ts' timestamp field in any of the fetched documents";
+ }
}
- }
- else {
- // Error, decide what to do...
+ if (*nextAction == Fetcher::NextAction::kNoAction) {
+ // TODO: create new fetcher?, with new query from where we left off -- d'tor fetcher
- if (status.code() == ErrorCodes::InvalidSyncSource) {
- // Error, sync source
- Date_t until{};
- _replCoord->blacklistSyncSource(_syncSource, until);
- _syncSource = HostAndPort();
}
+ }
- if (status.code() == ErrorCodes::OplogStartMissing) {
- // possible rollback
- bool didRollback = _didRollback(_syncSource);
- if (!didRollback) {
- _replCoord->setFollowerMode(MemberState::RS_RECOVERING); // TODO too stale
- }
- else {
- // TODO: cleanup state/restart -- set _lastApplied, and other stuff
+ if (!status.isOK()) {
+ // Got an error, now decide what to do...
+ switch (status.code()) {
+ case ErrorCodes::OplogStartMissing: {
+ // possible rollback
+ bool didRollback = _didRollback(_syncSource);
+ if (!didRollback) {
+ _replCoord->setFollowerMode(MemberState::RS_RECOVERING); // TODO too stale
+ }
+ else {
+ // TODO: cleanup state/restart -- set _lastApplied, and other stuff
+ }
+ break;
}
+ case ErrorCodes::InvalidSyncSource:
+ // Error, sync source
+ // fallthrough
+ default:
+ // TODO: SERVER-18034 -- real blacklist timeout time
+ Date_t until{};
+ LockGuard lk(_mutex);
+ _replCoord->blacklistSyncSource(_syncSource, until);
+ _syncSource = HostAndPort();
}
}
@@ -520,4 +1376,3 @@ namespace repl {
}
} // namespace repl
} // namespace mongo
-
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index edcf274dbc7..0573b25688b 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -29,10 +29,7 @@
#pragma once
-#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
#include <vector>
#include "mongo/platform/basic.h"
@@ -41,24 +38,40 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/applier.h"
+#include "mongo/db/repl/collection_cloner.h"
+#include "mongo/db/repl/database_cloner.h"
#include "mongo/db/repl/fetcher.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/reporter.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/queue.h"
namespace mongo {
namespace repl {
-namespace {
- typedef ReplicationExecutor::CallbackHandle Handle;
- typedef ReplicationExecutor::EventHandle Event;
- typedef ReplicationExecutor::CallbackData CallbackData;
- typedef ReplicationExecutor::RemoteCommandCallbackData CommandCallbackData;
+using Operations = Applier::Operations;
+using BatchDataStatus = StatusWith<Fetcher::BatchData>;
+using CallbackData = ReplicationExecutor::CallbackData;
+using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>;
+using CommandCallbackData = ReplicationExecutor::RemoteCommandCallbackData;
+using Event = ReplicationExecutor::EventHandle;
+using Handle = ReplicationExecutor::CallbackHandle;
+using LockGuard = stdx::lock_guard<stdx::mutex>;
+using NextAction = Fetcher::NextAction;
+using Request = RemoteCommandRequest;
+using Response = RemoteCommandResponse;
+using TimestampStatus = StatusWith<Timestamp>;
+using UniqueLock = stdx::unique_lock<stdx::mutex>;
+
+class QueryFetcher;
+class OplogFetcher;
+struct InitialSyncState;
+
-} // namespace
/** State for decision tree */
-enum class DataReplicatiorState {
+enum class DataReplicatorState {
Steady, // Default
InitialSync,
Rollback,
@@ -66,7 +79,7 @@ enum class DataReplicatiorState {
};
// TBD -- ignore for now
-enum class DataReplicatiorScope {
+enum class DataReplicatorScope {
ReplicateAll,
ReplicateDB,
ReplicateCollection
@@ -78,37 +91,25 @@ struct DataReplicatorOptions {
NamespaceString remoteOplogNS = NamespaceString("local.oplog.rs");
// TBD -- ignore below for now
- DataReplicatiorScope scope = DataReplicatiorScope::ReplicateAll;
+ DataReplicatorScope scope = DataReplicatorScope::ReplicateAll;
std::string scopeNS;
BSONObj filterCriteria;
HostAndPort syncSource; // for use without replCoord -- maybe some kind of rsMonitor/interface
-};
-// TODO: Break out: or at least move body to cpp
-class InitialSyncImpl {
-public:
- InitialSyncImpl(ReplicationExecutor* exec)
- : _status(StatusWith<Timestamp>(Timestamp())),
- _exec(exec) {
+ // TODO: replace with real applier function
+ Applier::ApplyOperationFn applierFn = [] (OperationContext*, const BSONObj&) -> Status {
+ return Status::OK();
};
- Status start();
-
- void wait() {
- // TODO
- };
-
- bool isActive() { return _finishEvent.isValid(); };
-
- StatusWith<Timestamp> getStatus() {return _status;}
-private:
- StatusWith<Timestamp> _status;
- ReplicationExecutor* _exec;
- Event _finishEvent;
+ std::string toString() const {
+ return str::stream() << "DataReplicatorOptions -- "
+ << " localOplogNs: " << localOplogNS.toString()
+ << " remoteOplogNS: " << remoteOplogNS.toString()
+ << " syncSource: " << syncSource.toString()
+ << " startOptime: " << startOptime.toString();
+ }
};
-class Applier {};
-
/**
* The data replicator provides services to keep collection in sync by replicating
* changes via an oplog source to the local system storage.
@@ -118,6 +119,9 @@ class Applier {};
*/
class DataReplicator {
public:
+ /** Function to call when a batch is applied. */
+ using OnBatchCompleteFn = stdx::function<void (const Timestamp&)>;
+
DataReplicator(DataReplicatorOptions opts,
ReplicationExecutor* exec,
ReplicationCoordinator* replCoord);
@@ -127,6 +131,8 @@ public:
DataReplicator(DataReplicatorOptions opts,
ReplicationExecutor* exec);
+ virtual ~DataReplicator();
+
Status start();
Status shutdown();
@@ -137,19 +143,24 @@ public:
Status pause();
// Pauses replication and waits to return until all un-applied ops have been applied
- StatusWith<Timestamp> flushAndPause();
+ TimestampStatus flushAndPause();
// Called when a slave has progressed to a new oplog position
void slavesHaveProgressed();
// just like initialSync but can be called anytime.
- StatusWith<Timestamp> resync();
+ TimestampStatus resync();
// Don't use above methods before these
- StatusWith<Timestamp> initialSync();
+ TimestampStatus initialSync();
+
+ std::string getDiagnosticString() const;
// For testing only
- void _resetState(Timestamp lastAppliedOptime);
+ void _resetState_inlock(Timestamp lastAppliedOptime);
+ void __setSourceForTesting(HostAndPort src) { _syncSource = src; }
+ void _setInitialSyncStorageInterface(CollectionCloner::StorageInterface* si);
+
private:
// Run a member function in the executor, waiting for it to finish.
@@ -157,9 +168,8 @@ private:
// Only executed via executor
void _resumeFinish(CallbackData cbData);
- void _onFetchFinish(const StatusWith<Fetcher::BatchData>& fetchResult,
- Fetcher::NextAction* nextAction);
- void _onApplyBatchFinish(CallbackData cbData);
+ void _onOplogFetchFinish(const BatchDataStatus& fetchResult,
+ Fetcher::NextAction* nextAction);
void _doNextActionsCB(CallbackData cbData);
void _doNextActions();
void _doNextActions_InitialSync_inlock();
@@ -171,15 +181,37 @@ private:
Timestamp _applyUntil(Timestamp);
void _pauseApplier();
+ Operations _getNextApplierBatch_inlock();
+ void _onApplyBatchFinish(const CallbackData&,
+ const TimestampStatus&,
+ const Operations&,
+ const size_t numApplied);
+ void _handleFailedApplyBatch(const TimestampStatus&, const Operations&);
+ // Fetches the last doc from the first operation, and reschedules the apply for the ops.
+ void _scheduleApplyAfterFetch(const Operations&);
+ void _onMissingFetched(const BatchDataStatus& fetchResult,
+ Fetcher::NextAction* nextAction,
+ const Operations& ops,
+ const NamespaceString nss);
+
// returns true if a rollback is needed
bool _needToRollback(HostAndPort source, Timestamp lastApplied);
+ void _onDataClonerFinish(const Status& status);
+ // Called after _onDataClonerFinish when the new Timestamp is avail, to use for minvalid
+ void _onApplierReadyStart(const BatchDataStatus& fetchResult,
+ Fetcher::NextAction* nextAction);
+
Status _scheduleApplyBatch();
+ Status _scheduleApplyBatch_inlock();
+ Status _scheduleApplyBatch_inlock(const Operations& ops);
Status _scheduleFetch();
+ Status _scheduleFetch_inlock();
Status _scheduleReport();
- const void _cancelAllHandles();
- const bool _anyActiveHandles();
+ void _cancelAllHandles_inlock();
+ void _waitOnAll_inlock();
+ bool _anyActiveHandles_inlock() const;
Status _shutdown();
void _changeStateIfNeeded();
@@ -204,28 +236,32 @@ private:
// (I) Independently synchronized, see member variable comment.
// Protects member data of this ReplicationCoordinator.
- mutable boost::mutex _mutex; // (S)
- DataReplicatiorState _state; // (MX)
+ mutable stdx::mutex _mutex; // (S)
+ DataReplicatorState _state; // (MX)
+
+ // initial sync state
+ std::unique_ptr<InitialSyncState> _initialSyncState; // (M)
+ CollectionCloner::StorageInterface* _storage; // (M)
// set during scheduling and onFinish
bool _fetcherPaused; // (X)
- boost::scoped_ptr<Fetcher> _fetcher; // (S)
+ std::unique_ptr<OplogFetcher> _fetcher; // (S)
+ std::unique_ptr<QueryFetcher> _tmpFetcher; // (S)
- bool _reporterActive; // (M)
+ bool _reporterPaused; // (M)
Handle _reporterHandle; // (M)
- boost::scoped_ptr<Reporter> _reporter; // (M)
+ std::unique_ptr<Reporter> _reporter; // (M)
bool _applierActive; // (M)
bool _applierPaused; // (X)
- Handle _applierHandle; // (M)
- boost::scoped_ptr<Applier> _applier; // (M)
+ std::unique_ptr<Applier> _applier; // (M)
+ OnBatchCompleteFn _batchCompletedFn; // (M)
- boost::scoped_ptr<InitialSyncImpl> _initialSync; // (M)
HostAndPort _syncSource; // (M)
- Timestamp _lastOptimeFetched; // (MX)
- Timestamp _lastOptimeApplied; // (MX)
- std::vector<BSONObj> _oplogBuffer; // (M)
+ Timestamp _lastTimestampFetched; // (MX)
+ Timestamp _lastTimestampApplied; // (MX)
+ BlockingQueue<BSONObj> _oplogBuffer; // (M)
// Shutdown
bool _doShutdown; // (M)
@@ -233,9 +269,6 @@ private:
// Rollback stuff
Timestamp _rollbackCommonOptime; // (MX)
-
-
-
};
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 6aef8263679..818fe559bec 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -26,22 +26,29 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
#include "mongo/platform/basic.h"
#include <memory>
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/data_replicator.h"
#include "mongo/db/repl/fetcher.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
-#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
-#include "mongo/db/repl/topology_coordinator.h"
+#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/util/concurrency/thread_name.h"
+#include "mongo/util/log.h"
#include "mongo/unittest/unittest.h"
@@ -49,7 +56,16 @@ namespace {
using namespace mongo;
using namespace mongo::repl;
using executor::NetworkInterfaceMock;
+ using LockGuard = stdx::lock_guard<stdx::mutex>;
+ using UniqueLock = stdx::unique_lock<stdx::mutex>;
+ using mutex = stdx::mutex;
+ ReplicaSetConfig assertMakeRSConfig(const BSONObj& configBson) {
+ ReplicaSetConfig config;
+ ASSERT_OK(config.initialize(configBson));
+ ASSERT_OK(config.validate());
+ return config;
+ }
const HostAndPort target("localhost", -1);
class DataReplicatorTest : public ReplicationExecutorTest {
@@ -62,6 +78,7 @@ namespace {
// PRNG seed for tests.
const int64_t seed = 0;
+ _settings.replSet = "foo"; // We are a replica set :)
ReplicationExecutor* exec = &(getExecutor());
_topo = new TopologyCoordinatorImpl(Seconds(0));
_externalState = new ReplicationCoordinatorExternalStateMock;
@@ -71,8 +88,11 @@ namespace {
exec,
seed));
launchExecutorThread();
- _dr.reset(new DataReplicator(DataReplicatorOptions(), exec, _repl.get()));
+ createDataReplicator(DataReplicatorOptions{});
}
+
+ void postExecutorThreadLaunch() override {};
+
void tearDown() override {
ReplicationExecutorTest::tearDown();
// Executor may still invoke callback before shutting down.
@@ -82,6 +102,12 @@ namespace {
// clear/reset state
}
+
+ void createDataReplicator(DataReplicatorOptions opts) {
+ _dr.reset(new DataReplicator(opts, &(getExecutor()), _repl.get()));
+ _dr->__setSourceForTesting(target);
+ }
+
void scheduleNetworkResponse(const BSONObj& obj) {
NetworkInterfaceMock* net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
@@ -114,11 +140,12 @@ namespace {
}
DataReplicator& getDR() { return *_dr; }
+ TopologyCoordinatorImpl& getTopo() { return *_topo; }
+ ReplicationCoordinatorImpl& getRepl() { return *_repl; }
- protected:
- std::unique_ptr<DataReplicator> _dr;
private:
+ std::unique_ptr<DataReplicator> _dr;
boost::scoped_ptr<ReplicationCoordinatorImpl> _repl;
// Owned by ReplicationCoordinatorImpl
TopologyCoordinatorImpl* _topo;
@@ -131,7 +158,10 @@ namespace {
};
TEST_F(DataReplicatorTest, CreateDestroy) {
+ }
+ TEST_F(DataReplicatorTest, StartOk) {
+ ASSERT_EQ(getDR().start().code(), ErrorCodes::OK);
}
TEST_F(DataReplicatorTest, CannotInitialSyncAfterStart) {
@@ -139,15 +169,338 @@ namespace {
ASSERT_EQ(getDR().initialSync(), ErrorCodes::AlreadyInitialized);
}
- TEST_F(DataReplicatorTest, InitialSyncFailpoint) {
+ // Used to run a Initial Sync in a separate thread, to avoid blocking test execution.
+ class InitialSyncBackgroundRunner {
+ public:
+
+ InitialSyncBackgroundRunner(DataReplicator* dr) :
+ _dr(dr),
+ _result(Status(ErrorCodes::BadValue, "failed to set status")) {}
+
+ // Could block if _sgr has not finished
+ TimestampStatus getResult() {
+ _thread->join();
+ return _result;
+ }
+
+ void run() {
+ _thread.reset(new boost::thread(stdx::bind(&InitialSyncBackgroundRunner::_run, this)));
+ sleepmillis(2); // sleep to let new thread run initialSync so it schedules work
+ }
+
+ private:
+
+ void _run() {
+ setThreadName("InitialSyncRunner");
+ log() << "starting initial sync";
+ _result = _dr->initialSync(); // blocking
+ }
+
+ DataReplicator* _dr;
+ TimestampStatus _result;
+ boost::scoped_ptr<boost::thread> _thread;
+ };
+
+ class InitialSyncTest : public DataReplicatorTest {
+ public:
+ InitialSyncTest()
+ : _insertCollectionFn([&](OperationContext* txn,
+ const NamespaceString& theNss,
+ const std::vector<BSONObj>& theDocuments) {
+ log() << "insertDoc for " << theNss.toString();
+ LockGuard lk(_collectionCountMutex);
+ ++(_collectionCounts[theNss.toString()]);
+ return Status::OK();
+ }),
+ _beginCollectionFn([&](OperationContext* txn,
+ const NamespaceString& theNss,
+ const CollectionOptions& theOptions,
+ const std::vector<BSONObj>& theIndexSpecs) {
+ log() << "beginCollection for " << theNss.toString();
+ LockGuard lk(_collectionCountMutex);
+ _collectionCounts[theNss.toString()] = 0;
+ return Status::OK();
+ }) {};
+
+ protected:
+
+ void setStorageFuncs(ClonerStorageInterfaceMock::InsertCollectionFn ins,
+ ClonerStorageInterfaceMock::BeginCollectionFn beg) {
+ _insertCollectionFn = ins;
+ _beginCollectionFn = beg;
+ }
+
+ void setResponses(std::vector<BSONObj> resps) {
+ _responses = resps;
+ }
+
+ void startSync() {
+ DataReplicator* dr = &(getDR());
+
+ _storage.beginCollectionFn = _beginCollectionFn;
+ _storage.insertDocumentsFn = _insertCollectionFn;
+ _storage.insertMissingDocFn = [&] (OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) {
+ return Status::OK();
+ };
+
+ dr->_setInitialSyncStorageInterface(&_storage);
+ _isbr.reset(new InitialSyncBackgroundRunner(dr));
+ _isbr->run();
+ }
+
+
+ void playResponses() {
+ // TODO: Handle network responses
+ NetworkInterfaceMock* net = getNet();
+ int processedRequests(0);
+ const int expectedResponses(_responses.size());
+
+ //counter for oplog entries
+ int c(0);
+ while (true) {
+ net->enterNetwork();
+ if (!net->hasReadyRequests() && processedRequests < expectedResponses) {
+ net->exitNetwork();
+ continue;
+ }
+ NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+
+ const BSONObj reqBSON = noi->getRequest().cmdObj;
+ const BSONElement cmdElem = reqBSON.firstElement();
+ const bool isGetMore =
+ cmdElem.fieldNameStringData().equalCaseInsensitive("getmore");
+ const long long cursorId = cmdElem.numberLong();
+ if (isGetMore && cursorId == 1LL) {
+ // process getmore requests from the oplog fetcher
+ auto respBSON = fromjson(
+ str::stream() << "{ok:1, cursor:{id:1, ns:'local.oplog.rs', nextBatch:["
+ "{ts:Timestamp(" << ++c << ",1), h:1, ns:'test.a', v:2, op:'u', o2:{_id:"
+ << c << "}, o:{$set:{a:1}}}"
+ "]}}");
+ net->scheduleResponse(noi,
+ net->now(),
+ ResponseStatus(RemoteCommandResponse(respBSON,
+ Milliseconds(10))));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ continue;
+ }
+ else if (isGetMore) {
+ // TODO: return more data
+ }
+
+ // process fixed set of responses
+ log() << "processing network request: "
+ << noi->getRequest().dbname << "." << noi->getRequest().cmdObj.toString();
+ net->scheduleResponse(noi,
+ net->now(),
+ ResponseStatus(
+ RemoteCommandResponse(_responses[processedRequests],
+ Milliseconds(10))));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ if (++processedRequests >= expectedResponses) {
+ log() << "done processing expected requests ";
+ break; // once we have processed all requests, continue;
+ }
+ }
+
+ net->enterNetwork();
+ if (net->hasReadyRequests()) {
+ log() << "There are unexpected requests left";
+ log() << "next cmd: " << net->getNextReadyRequest()->getRequest().cmdObj.toString();
+ ASSERT_FALSE(net->hasReadyRequests());
+ }
+ net->exitNetwork();
+ }
+
+ void verifySync(Status s = Status::OK()) {
+ verifySync(_isbr->getResult().getStatus().code());
+ }
+
+ void verifySync(ErrorCodes::Error code) {
+ // Check result
+ ASSERT_EQ(_isbr->getResult().getStatus().code(), code) << "status codes differ";
+ }
+
+ std::map<std::string, int> getLocalCollectionCounts() {
+ return _collectionCounts;
+ }
+
+ private:
+ ClonerStorageInterfaceMock::InsertCollectionFn _insertCollectionFn;
+ ClonerStorageInterfaceMock::BeginCollectionFn _beginCollectionFn;
+ std::vector<BSONObj> _responses;
+ std::unique_ptr<InitialSyncBackgroundRunner> _isbr;
+ std::map<std::string, int> _collectionCounts; // counts of inserts during cloning
+ mutex _collectionCountMutex; // used to protect the collectionCount map
+ ClonerStorageInterfaceMock _storage;
+ };
+
+ TEST_F(InitialSyncTest, Complete) {
+ /**
+ * Initial Sync will issue these query/commands
+ * - startTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"]
+ * - listDatabases (foreach db do below)
+ * -- cloneDatabase (see DatabaseCloner tests).
+ * - endTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"]
+ * - ops = oplog.rs->find({ts:{$gte: startTS}}) (foreach op)
+ * -- if local doc is missing, getCollection(op.ns).findOne(_id:op.o2._id)
+ * - if any retries were done in the previous loop, endTS query again for minvalid
+ *
+ */
+
+ const std::vector<BSONObj> responses = {
+ // get latest oplog ts
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
+ "]}}"),
+ // oplog fetcher find
+ fromjson(
+ "{ok:1, cursor:{id:1, ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
+ "]}}"),
+// Clone Start
+ // listDatabases
+ fromjson("{ok:1, databases:[{name:'a'}]}"),
+ // listCollections for "a"
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'a.$cmd.listCollections', firstBatch:["
+ "{name:'a', options:{}} "
+ "]}}"),
+ // listIndexes:a
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}"
+ "]}}"),
+ // find:a
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'a.a', firstBatch:["
+ "{_id:1, a:1} "
+ "]}}"),
+// Clone Done
+ // get latest oplog ts
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}"
+ "]}}"),
+// Applier starts ...
+ };
+ startSync();
+ setResponses(responses);
+ playResponses();
+ verifySync();
+ }
+
+ TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) {
+
+ DataReplicatorOptions opts;
+ int applyCounter{0};
+ opts.applierFn = [&] (OperationContext* txn, const BSONObj& op) {
+ if (++applyCounter == 1) {
+ return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
+ }
+ return Status::OK();
+ };
+ createDataReplicator(opts);
+
+ const std::vector<BSONObj> responses = {
+ // get latest oplog ts
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
+ "]}}"),
+ // oplog fetcher find
+ fromjson(
+ "{ok:1, cursor:{id:1, ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'u', o2:{_id:1}, o:{$set:{a:1}}}"
+ "]}}"),
+// Clone Start
+ // listDatabases
+ fromjson("{ok:1, databases:[{name:'a'}]}"),
+ // listCollections for "a"
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'a.$cmd.listCollections', firstBatch:["
+ "{name:'a', options:{}} "
+ "]}}"),
+ // listIndexes:a
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}"
+ "]}}"),
+ // find:a -- empty
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'a.a', firstBatch:[]}}"),
+// Clone Done
+ // get latest oplog ts
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}"
+ "]}}"),
+// Applier starts ...
+ // missing doc fetch -- find:a {_id:1}
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'a.a', firstBatch:["
+ "{_id:1, a:1} "
+ "]}}"),
+ };
+ startSync();
+ setResponses(responses);
+ playResponses();
+ verifySync(ErrorCodes::OK);
+ }
+
+ TEST_F(InitialSyncTest, Failpoint) {
mongo::getGlobalFailPointRegistry()->
getFailPoint("failInitialSyncWithBadHost")->
setMode(FailPoint::alwaysOn);
- ASSERT_EQ(getDR().initialSync(), ErrorCodes::InitialSyncFailure);
+ BSONObj configObj = BSON("_id" << "mySet" <<
+ "version" << 1 <<
+ "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345")
+ << BSON("_id" << 2 << "host" << "node2:12345")
+ << BSON("_id" << 3 << "host" << "node3:12345")
+ ));
+
+ ReplicaSetConfig config = assertMakeRSConfig(configObj);
+ Timestamp time1(100, 1);
+ OpTime opTime1(time1, OpTime::kDefaultTerm);
+ getTopo().updateConfig(config, 0, getNet()->now(), opTime1);
+ getRepl().setMyLastOptime(opTime1);
+ ASSERT(getRepl().setFollowerMode(MemberState::RS_SECONDARY));
+
+ DataReplicator* dr = &(getDR());
+ InitialSyncBackgroundRunner isbr(dr);
+ isbr.run();
+ ASSERT_EQ(isbr.getResult().getStatus().code(), ErrorCodes::InitialSyncFailure);
mongo::getGlobalFailPointRegistry()->
getFailPoint("failInitialSyncWithBadHost")->
setMode(FailPoint::off);
}
+
+ TEST_F(InitialSyncTest, FailsOnClone) {
+ const std::vector<BSONObj> responses = {
+ // get latest oplog ts
+ fromjson(
+ "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
+ "]}}"),
+ // oplog fetcher find
+ fromjson(
+ "{ok:1, cursor:{id:1, ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
+ "]}}"),
+// Clone Start
+ // listDatabases
+ fromjson("{ok:0}")
+ };
+ startSync();
+ setResponses(responses);
+ playResponses();
+ verifySync(ErrorCodes::InitialSyncFailure);
+ }
}
diff --git a/src/mongo/db/repl/fetcher.cpp b/src/mongo/db/repl/fetcher.cpp
index 10eb8ae3149..f5436630d75 100644
--- a/src/mongo/db/repl/fetcher.cpp
+++ b/src/mongo/db/repl/fetcher.cpp
@@ -78,10 +78,12 @@ namespace {
"cursor response must contain '" << kCursorFieldName << "." <<
kCursorIdFieldName << "' field: " << obj);
}
- if (cursorIdElement.type() != mongo::NumberLong) {
+ if (!(cursorIdElement.type() == mongo::NumberLong ||
+ cursorIdElement.type() == mongo::NumberInt)) {
return Status(ErrorCodes::FailedToParse, str::stream() <<
"'" << kCursorFieldName << "." << kCursorIdFieldName <<
- "' field must be a number of type 'long': " << obj);
+ "' field must be a integral number of type 'int' or 'long' but was a '"
+ << typeName(cursorIdElement.type()) << "': " << obj);
}
batchData->cursorId = cursorIdElement.numberLong();
diff --git a/src/mongo/db/repl/fetcher.h b/src/mongo/db/repl/fetcher.h
index efacca9f1ad..474e43afef6 100644
--- a/src/mongo/db/repl/fetcher.h
+++ b/src/mongo/db/repl/fetcher.h
@@ -70,7 +70,7 @@ namespace repl {
/**
* Represents next steps of fetcher.
*/
- enum class NextAction {
+ enum class NextAction : int {
kInvalid=0,
kNoAction=1,
kGetMore=2
diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp
index 85a693dd48f..4672dd9759d 100644
--- a/src/mongo/db/repl/fetcher_test.cpp
+++ b/src/mongo/db/repl/fetcher_test.cpp
@@ -293,13 +293,14 @@ namespace {
TEST_F(FetcherTest, CursorIdNotLongNumber) {
ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 123 <<
+ processNetworkResponse(BSON("cursor" << BSON("id" << 123.1 <<
"ns" << "db.coll" <<
"firstBatch" << BSONArray()) <<
"ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(),
- "'cursor.id' field must be a number of type 'long'");
+ "'cursor.id' field must be");
+ ASSERT_EQ((int)Fetcher::NextAction::kInvalid, (int)nextAction);
}
TEST_F(FetcherTest, NamespaceFieldMissing) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index e3ea081940a..8f878067129 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -77,7 +77,6 @@ namespace mongo {
namespace repl {
namespace {
- typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus;
using executor::NetworkInterface;
void lockAndCall(boost::unique_lock<boost::mutex>* lk, const stdx::function<void ()>& fn) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index e55ab6d656d..413445b5e1a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -57,7 +57,6 @@ namespace repl {
namespace {
- typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus;
typedef ReplicationExecutor::CallbackHandle CBHandle;
} //namespace
diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp
index 72c611b600a..2a8e34fc915 100644
--- a/src/mongo/db/repl/replication_executor.cpp
+++ b/src/mongo/db/repl/replication_executor.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/repl/replication_executor.h"
#include <limits>
+#include <thread>
#include "mongo/db/repl/database_task.h"
#include "mongo/db/repl/storage_interface.h"
@@ -57,7 +58,7 @@ namespace {
_inShutdown(false),
_dblockWorkers(threadpool::ThreadPool::DoNotStartThreadsTag(),
3,
- "replCallbackWithGlobalLock-"),
+ "replExecDBWorker-"),
_dblockTaskRunner(
&_dblockWorkers,
stdx::bind(&StorageInterface::createOperationContext, storageInterface)),
@@ -94,8 +95,13 @@ namespace {
return _networkInterface->now();
}
+ bool ReplicationExecutor::isRunThread() const {
+ return (_runThreadId == std::this_thread::get_id());
+ }
+
void ReplicationExecutor::run() {
setThreadName("ReplicationExecutor");
+ _runThreadId = std::this_thread::get_id();
_networkInterface->startup();
_dblockWorkers.startThreads();
std::pair<WorkItem, CallbackHandle> work;
diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h
index b90d0e24133..14d9f50ccfb 100644
--- a/src/mongo/db/repl/replication_executor.h
+++ b/src/mongo/db/repl/replication_executor.h
@@ -32,6 +32,7 @@
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <string>
+#include <thread>
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
@@ -335,6 +336,11 @@ namespace repl {
*/
int64_t nextRandomInt64(int64_t limit);
+ /**
+ * Returns true if executing in the "run" thread, which should not block with IO
+ */
+ bool isRunThread() const;
+
private:
struct Event;
struct WorkItem;
@@ -448,6 +454,7 @@ namespace repl {
TaskRunner _dblockTaskRunner;
TaskRunner _dblockExclusiveLockTaskRunner;
uint64_t _nextId;
+ std::thread::id _runThreadId;
};
/**
diff --git a/src/mongo/db/repl/replication_executor_test_fixture.cpp b/src/mongo/db/repl/replication_executor_test_fixture.cpp
index 6efbe072d52..ff84a4de538 100644
--- a/src/mongo/db/repl/replication_executor_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_executor_test_fixture.cpp
@@ -47,7 +47,11 @@ namespace {
ASSERT(!_executorThread);
_executorThread.reset(
new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get())));
- getNet()->enterNetwork();
+ postExecutorThreadLaunch();
+ }
+
+ void ReplicationExecutorTest::postExecutorThreadLaunch() {
+ _net->enterNetwork();
}
void ReplicationExecutorTest::joinExecutorThread() {
diff --git a/src/mongo/db/repl/replication_executor_test_fixture.h b/src/mongo/db/repl/replication_executor_test_fixture.h
index 479d799b990..a5a7ae6bf25 100644
--- a/src/mongo/db/repl/replication_executor_test_fixture.h
+++ b/src/mongo/db/repl/replication_executor_test_fixture.h
@@ -60,6 +60,11 @@ namespace repl {
void launchExecutorThread();
/**
+ * Anything that needs to be done after launchExecutorThread should go in here.
+ */
+ virtual void postExecutorThreadLaunch();
+
+ /**
* Waits for background ReplicationExecutor to stop running.
*
* The executor should be shutdown prior to calling this function
diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp
index a3120adf01c..325374ae76b 100644
--- a/src/mongo/db/repl/reporter.cpp
+++ b/src/mongo/db/repl/reporter.cpp
@@ -75,6 +75,13 @@ namespace repl {
_executor->cancel(_remoteCommandCallbackHandle);
}
+ void Reporter::wait() {
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ if(_remoteCommandCallbackHandle.isValid()) {
+ _executor->wait(_remoteCommandCallbackHandle);
+ }
+ }
+
Status Reporter::trigger() {
boost::lock_guard<boost::mutex> lk(_mutex);
return _schedule_inlock();
@@ -128,7 +135,7 @@ namespace repl {
}
}
- Status Reporter::previousReturnStatus() const {
+ Status Reporter::getStatus() const {
boost::lock_guard<boost::mutex> lk(_mutex);
return _status;
}
diff --git a/src/mongo/db/repl/reporter.h b/src/mongo/db/repl/reporter.h
index c96a896cd88..36caa78fe6b 100644
--- a/src/mongo/db/repl/reporter.h
+++ b/src/mongo/db/repl/reporter.h
@@ -69,6 +69,12 @@ namespace repl {
void cancel();
/**
+ * Waits for last/current executor handle to finish.
+ * Returns immediately if the handle is invalid.
+ */
+ void wait();
+
+ /**
* Signals to the Reporter that there is new information to be sent to the "_target" server.
* Returns the _status, indicating any error the Reporter has encountered.
*/
@@ -78,7 +84,7 @@ namespace repl {
* Returns the previous return status so that the owner can decide whether the Reporter
* needs a new target to whom it can report.
*/
- Status previousReturnStatus() const;
+ Status getStatus() const;
private:
/**
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index ed617fa0cdf..c4bd0b6176e 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -149,7 +149,7 @@ namespace {
ASSERT_TRUE(reporter->isActive());
scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah");
getNet()->runReadyNetworkOperations();
- ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->previousReturnStatus());
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->getStatus());
ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->trigger());
ASSERT_FALSE(reporter->isActive());
ASSERT_FALSE(getNet()->hasReadyRequests());
@@ -167,7 +167,7 @@ namespace {
scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah");
getNet()->runReadyNetworkOperations();
- ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->previousReturnStatus());
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->getStatus());
ASSERT_FALSE(reporter->willRunAgain());
ASSERT_FALSE(reporter->isActive());
ASSERT_FALSE(getNet()->hasReadyRequests());
@@ -238,7 +238,7 @@ namespace {
ASSERT_FALSE(reporter->isActive());
ASSERT_FALSE(reporter->willRunAgain());
ASSERT_FALSE(getNet()->hasReadyRequests());
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->previousReturnStatus());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->getStatus());
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger());
}
@@ -262,7 +262,7 @@ namespace {
ASSERT_FALSE(reporter->isActive());
ASSERT_FALSE(reporter->willRunAgain());
ASSERT_FALSE(getNet()->hasReadyRequests());
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->previousReturnStatus());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->getStatus());
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger());
}
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 04e82151370..fc1a10de72d 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -444,6 +444,10 @@ namespace {
str::stream() << "initial sync failed: " << msg);
}
+ // WARNING: If the 3rd oplog sync step is removed we must reset minValid
+ // to the last entry on the source server so that we don't come
+ // out of recovering until we get there (since the previous steps
+ // could have fetched newer document than the oplog entry we were applying from).
msg = "oplog sync 3 of 3";
log() << msg;