diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2015-06-05 16:30:49 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2015-06-05 20:51:01 -0400 |
commit | 503731032f19d5fea414442bf891a3a8bfe76759 (patch) | |
tree | 6b9301f25844328b31c179b298822ecf8ac1aee8 | |
parent | 99efb5fc4f771feb8ae81434b4e2d6f7445665e1 (diff) | |
download | mongo-503731032f19d5fea414442bf891a3a8bfe76759.tar.gz |
SERVER-18039: Add Initial Sync to DataReplicator
20 files changed, 1622 insertions, 298 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d041ef49b72..82143ba4bd6 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -72,17 +72,6 @@ env.CppUnitTest( ) env.Library( - target='data_replicator', - source=[ - 'data_replicator.cpp', - ], - LIBDEPS=[ - 'fetcher', - 'repl_coordinator_interface', - ], -) - -env.Library( target='oplog_interface_local', source=[ 'oplog_interface_local.cpp', @@ -263,19 +252,6 @@ env.CppUnitTest('replica_set_config_checks_test', 'replmocks' ]) -env.CppUnitTest( - target='data_replicator_test', - source=[ - 'data_replicator_test.cpp', - ], - LIBDEPS=[ - 'data_replicator', - 'replica_set_messages', - 'replication_executor_test_fixture', - 'repl_coordinator_test_fixture', - ], -) - env.CppUnitTest('scatter_gather_test', 'scatter_gather_test.cpp', LIBDEPS=['repl_coordinator_impl', @@ -611,6 +587,7 @@ env.Library( 'applier.cpp', ], LIBDEPS=[ + 'replication_executor', ], ) @@ -625,6 +602,34 @@ env.CppUnitTest( ) env.Library( + target='data_replicator', + source=[ + 'data_replicator.cpp', + ], + LIBDEPS=[ + 'applier', + 'collection_cloner', + 'database_cloner', + 'fetcher', + 'repl_coordinator_interface', + ], +) + +env.CppUnitTest( + target='data_replicator_test', + source=[ + 'data_replicator_test.cpp', + ], + LIBDEPS=[ + 'base_cloner_test_fixture', + 'data_replicator', + 'replica_set_messages', + 'replication_executor_test_fixture', + 'repl_coordinator_test_fixture', + ], +) + +env.Library( target='roll_back_local_operations', source=[ 'roll_back_local_operations.cpp', diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index 33357e702e3..8576cf11dc4 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -263,5 +263,15 @@ namespace repl { return Status::OK(); } + Status ClonerStorageInterfaceMock::insertMissingDoc(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) { + return Status::OK(); + } + + Status ClonerStorageInterfaceMock::dropUserDatabases(OperationContext* txn) { + return dropUserDatabasesFn ? dropUserDatabasesFn(txn) : Status::OK(); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h index dca30fe8ae7..22cd7037c60 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.h +++ b/src/mongo/db/repl/base_cloner_test_fixture.h @@ -143,6 +143,18 @@ namespace repl { class ClonerStorageInterfaceMock : public CollectionCloner::StorageInterface { public: + using InsertCollectionFn = stdx::function<Status (OperationContext*, + const NamespaceString&, + const std::vector<BSONObj>&)>; + using BeginCollectionFn = stdx::function<Status (OperationContext*, + const NamespaceString&, + const CollectionOptions&, + const std::vector<BSONObj>&)>; + using InsertMissingDocFn = stdx::function<Status (OperationContext*, + const NamespaceString&, + const BSONObj&)>; + using DropUserDatabases = stdx::function<Status (OperationContext*)>; + Status beginCollection(OperationContext* txn, const NamespaceString& nss, const CollectionOptions& options, @@ -155,14 +167,16 @@ namespace repl { Status commitCollection(OperationContext* txn, const NamespaceString& nss) override; - stdx::function<Status (OperationContext*, - const NamespaceString&, - const CollectionOptions&, - const std::vector<BSONObj>&)> beginCollectionFn; + Status insertMissingDoc(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) override; + + Status dropUserDatabases(OperationContext* txn); - stdx::function<Status (OperationContext*, - const NamespaceString&, - const std::vector<BSONObj>&)> insertDocumentsFn; + BeginCollectionFn beginCollectionFn; + InsertCollectionFn insertDocumentsFn; + InsertMissingDocFn insertMissingDocFn; + DropUserDatabases dropUserDatabasesFn; }; } // namespace repl diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index f4f9d9867ec..5dccb2e21ba 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -244,6 +244,20 @@ namespace repl { virtual Status commitCollection(OperationContext* txn, const NamespaceString& nss) = 0; + /** + * Inserts missing document into a collection (not related to insertDocuments above), + * during initial sync retry logic + */ + virtual Status insertMissingDoc(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) = 0; + + /** + * Inserts missing document into a collection (not related to insertDocuments above), + * during initial sync retry logic + */ + virtual Status dropUserDatabases(OperationContext* txn) = 0; + }; } // namespace repl diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 288f218c19f..aa6b31ce9e8 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -34,136 +34,617 @@ #include <algorithm> #include <boost/thread.hpp> +#include <thread> #include "mongo/base/status.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/collection_cloner.h" +#include "mongo/db/repl/database_cloner.h" +#include "mongo/db/repl/optime.h" #include "mongo/stdx/functional.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/queue.h" #include "mongo/util/scopeguard.h" +#include "mongo/util/stacktrace.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace repl { - - using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>; - using CallbackFn = ReplicationExecutor::CallbackFn; - using Request = RemoteCommandRequest; - using Response = RemoteCommandResponse; - using CommandCallbackData = ReplicationExecutor::RemoteCommandCallbackData; -// typedef void (*run_func)(); - // Failpoint for initial sync MONGO_FP_DECLARE(failInitialSyncWithBadHost); - namespace { - const Milliseconds NoSyncSourceRetryDelayMS{4000}; - - std::string toString(DataReplicatiorState s) { - switch (s) { - case DataReplicatiorState::InitialSync: - return "InitialSync"; - case DataReplicatiorState::Rollback: - return "Rollback"; - case DataReplicatiorState::Steady: - return "Steady Replication"; - case DataReplicatiorState::Uninitialized: - return "Uninitialized"; - default: - return "<invalid>"; +namespace { + // public so tests can remove the sleep. + const Milliseconds NoSyncSourceRetryDelayMS{4000}; + const int InitialSyncRetrySleepSecs{1}; + + size_t getSize(const BSONObj& o) { + // SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion + return static_cast<size_t>(o.objsize()); + } + + std::string toString(DataReplicatorState s) { + switch (s) { + case DataReplicatorState::InitialSync: + return "InitialSync"; + case DataReplicatorState::Rollback: + return "Rollback"; + case DataReplicatorState::Steady: + return "Steady Replication"; + case DataReplicatorState::Uninitialized: + return "Uninitialized"; + default: + return "<invalid>"; + } + } + + Timestamp findCommonPoint(HostAndPort host, Timestamp start) { + // TODO: walk back in the oplog looking for a known/shared optime. + return Timestamp(); + } + + bool _didRollback(HostAndPort host) { + // TODO: rollback here, report if done. + return false; + } +} // namespace + + /** + * Follows the fetcher pattern for a find+getmore + */ + class QueryFetcher { + MONGO_DISALLOW_COPYING(QueryFetcher); + public: + using CallbackFn = stdx::function<void (const BatchDataStatus&, NextAction*)>; + + QueryFetcher(ReplicationExecutor* exec, + const HostAndPort& source, + const NamespaceString& nss, + const BSONObj& cmdBSON, + const QueryFetcher::CallbackFn& onBatchAvailable); + virtual ~QueryFetcher() = default; + + bool isActive() const { return _fetcher.isActive(); } + Status schedule() { return _fetcher.schedule(); } + void cancel() { return _fetcher.cancel(); } + void wait() { if (_fetcher.isActive()) _fetcher.wait(); } + std::string toString() const; + + protected: + void _onFetchCallback(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob); + + virtual void _delegateCallback(const BatchDataStatus& fetchResult, + NextAction* nextAction) { + _work(fetchResult, nextAction); + }; + + ReplicationExecutor* _exec; + Fetcher _fetcher; + int _responses; + const QueryFetcher::CallbackFn _work; + }; + + /** + * Follows the fetcher pattern for a find+getmore on an oplog + * Returns additional errors if the start oplog entry cannot be found. + */ + class OplogFetcher : public QueryFetcher { + MONGO_DISALLOW_COPYING(OplogFetcher); + public: + OplogFetcher(ReplicationExecutor* exec, + const Timestamp& startTS, + const HostAndPort& src, + const NamespaceString& nss, + const QueryFetcher::CallbackFn& work); + + virtual ~OplogFetcher() = default; + std::string toString() const; + + protected: + + void _delegateCallback(const BatchDataStatus& fetchResult, + NextAction* nextAction); + + const Timestamp _startTS; + }; + + // QueryFetcher + QueryFetcher::QueryFetcher(ReplicationExecutor* exec, + const HostAndPort& src, + const NamespaceString& nss, + const BSONObj& cmdBSON, + const CallbackFn& work) + : _exec(exec), + _fetcher(exec, + src, + nss.db().toString(), + cmdBSON, + stdx::bind(&QueryFetcher::_onFetchCallback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3)), + _responses(0), + _work(work) { + } + + void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + ++_responses; + + _delegateCallback(fetchResult, nextAction); + // The fetcher will continue to call with kGetMore until an error or the last batch. + if (fetchResult.isOK() && *nextAction == NextAction::kGetMore) { + const auto batchData(fetchResult.getValue()); + invariant(getMoreBob); + getMoreBob->append("getMore", batchData.cursorId); + getMoreBob->append("collection", batchData.nss.coll()); + } + } + + std::string QueryFetcher::toString() const { + return str::stream() << "QueryFetcher -" + << " responses: " << _responses + << " fetcher: " << _fetcher.getDiagnosticString(); + } + + // OplogFetcher + OplogFetcher::OplogFetcher(ReplicationExecutor* exec, + const Timestamp& startTS, + const HostAndPort& src, + const NamespaceString& oplogNSS, + const QueryFetcher::CallbackFn& work) + // TODO: add query options await_data, oplog_replay + : QueryFetcher(exec, + src, + oplogNSS, + BSON("find" << oplogNSS.coll() << + "query" << BSON("ts" << BSON("$gte" << startTS))), + work), + _startTS(startTS) { + } + + std::string OplogFetcher::toString() const { + return str::stream() << "OplogReader -" + << " startTS: " << _startTS.toString() + << " responses: " << _responses + << " fetcher: " << _fetcher.getDiagnosticString(); + } + + void OplogFetcher::_delegateCallback(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction) { + invariant(_exec->isRunThread()); + const bool checkStartTS = _responses == 0; + + if (fetchResult.isOK()) { + Fetcher::Documents::const_iterator firstDoc = fetchResult.getValue().documents.begin(); + auto hasDoc = firstDoc != fetchResult.getValue().documents.end(); + + if (checkStartTS && + (!hasDoc || (hasDoc && (*firstDoc)["ts"].timestamp() != _startTS))) { + // Set next action to none. + *nextAction = Fetcher::NextAction::kNoAction; + _work(Status(ErrorCodes::OplogStartMissing, + str::stream() << "First returned " << (*firstDoc)["ts"] + << " is not where we wanted to start: " + << _startTS.toString()), + nextAction); + return; + } + + if (hasDoc) { + _work(fetchResult, nextAction); + } + else { } } + else { + _work(fetchResult, nextAction); + } + }; + + class DatabasesCloner { + public: + DatabasesCloner(ReplicationExecutor* exec, + HostAndPort source, + stdx::function<void (const Status&)> finishFn) + : _status(ErrorCodes::NotYetInitialized, ""), + _exec(exec), + _source(source), + _active(false), + _clonersActive(0), + _finishFn(finishFn) { + if (!_finishFn) { + _status = Status(ErrorCodes::InvalidOptions, "finishFn is not callable."); + } + }; + + Status start(); + + bool isActive() { + return _active; + } + + Status getStatus() { + return _status; + } + + void cancel() { + if (!_active) + return; + _active = false; + // TODO: cancel all cloners + _setStatus(Status(ErrorCodes::CallbackCanceled, "Initial Sync Cancelled.")); + } + + void wait() { + // TODO: wait on all cloners + } + + std::string toString() { + return str::stream() << "initial sync --" << + " active:" << _active << + " status:" << _status.toString() << + " source:" << _source.toString() << + " db cloners active:" << _clonersActive << + " db count:" << _databaseCloners.size(); + } + + + // For testing + void setStorageInterface(CollectionCloner::StorageInterface* si) { + _storage = si; + } + + private: + + /** + * Does the next action necessary for the initial sync process. + * + * NOTE: If (!_status.isOK() || !_isActive) then early return. + */ + void _doNextActions(); + + /** + * Setting the status to not-OK will stop the process + */ + void _setStatus(CBHStatus s) { + _setStatus(s.getStatus()); + } + + /** + * Setting the status to not-OK will stop the process + */ + void _setStatus(Status s) { + // Only set the first time called, all subsequent failures are not recorded --only first + if (_status.code() != ErrorCodes::NotYetInitialized) { + _status = s; + } + } + + /** + * Setting the status to not-OK will stop the process + */ + void _setStatus(TimestampStatus s) { + _setStatus(s.getStatus()); + } + + void _failed(); + + /** Called each time a database clone is finished */ + void _onEachDBCloneFinish(const Status& status, const std::string name); + + // Callbacks + + void _onListDatabaseFinish(const CommandCallbackData& cbd); + + + // Member variables + Status _status; // If it is not OK, we stop everything. + ReplicationExecutor* _exec; // executor to schedule things with + HostAndPort _source; // The source to use, until we get an error + bool _active; // false until we start + std::vector<std::shared_ptr<DatabaseCloner>> _databaseCloners; // database cloners by name + int _clonersActive; + + const stdx::function<void (const Status&)> _finishFn; + + CollectionCloner::StorageInterface* _storage; + }; + + /** State held during Initial Sync */ + struct InitialSyncState { + InitialSyncState(DatabasesCloner cloner, Event event) + : dbsCloner(cloner), finishEvent(event), status(ErrorCodes::IllegalOperation, "") {}; + + DatabasesCloner dbsCloner; // Cloner for all databases included in initial sync. + Timestamp beginTimestamp; // Timestamp from the latest entry in oplog when started. + Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states. + Event finishEvent; // event fired on completion, either successful or not. + Status status; // final status, only valid after the finishEvent fires. + size_t fetchedMissingDocs; + size_t appliedOps; + + // Temporary fetch for things like fetching remote optime, or tail + std::unique_ptr<Fetcher> tmpFetcher; + TimestampStatus getLatestOplogTimestamp(ReplicationExecutor* exec, + HostAndPort source, + const NamespaceString& oplogNS); + void setStatus(const Status& s); + void setStatus(const CBHStatus& s); + void _setTimestampSatus(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction, + TimestampStatus* status) ; + }; + + // Initial Sync state + TimestampStatus InitialSyncState::getLatestOplogTimestamp(ReplicationExecutor* exec, + HostAndPort source, + const NamespaceString& oplogNS) { + + BSONObj query = BSON("find" << oplogNS.coll() << + "sort" << BSON ("$natural" << -1) << + "limit" << 1); + + TimestampStatus timestampStatus(ErrorCodes::BadValue, ""); + Fetcher f(exec, + source, + oplogNS.db().toString(), + query, + stdx::bind(&InitialSyncState::_setTimestampSatus, this, stdx::placeholders::_1, + stdx::placeholders::_2, ×tampStatus)); + Status s = f.schedule(); + if (!s.isOK()) { + return TimestampStatus(s); + } - /* - Status callViaExecutor(ReplicationExecutor* exec, const CallbackFn& work) { - CBHStatus cbh = exec->scheduleWork(work); - if (!cbh.getStatus().isOK()) { - return cbh.getStatus(); + // wait for fetcher to get the oplog position. + f.wait(); + return timestampStatus; + } + + void InitialSyncState::_setTimestampSatus(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction, + TimestampStatus* status) { + if (!fetchResult.isOK()) { + *status = TimestampStatus(fetchResult.getStatus()); + } else { + // TODO: Set _beginTimestamp from first doc "ts" field. + const auto docs = fetchResult.getValue().documents; + const auto hasDoc = docs.begin() != docs.end(); + if (!hasDoc || !docs.begin()->hasField("ts")) { + *status = TimestampStatus(ErrorCodes::FailedToParse, + "Could not find an oplog entry with 'ts' field."); + } else { + *status = TimestampStatus(docs.begin()->getField("ts").timestamp()); } + } + } - exec->wait(cbh.getValue()); + void InitialSyncState::setStatus(const Status& s) { + status = s; + } + void InitialSyncState::setStatus(const CBHStatus& s) { + setStatus(s.getStatus()); + } - return Status::OK(); + // Initial Sync + Status DatabasesCloner::start() { + _active = true; + + if (!_status.isOK() && _status.code() != ErrorCodes::NotYetInitialized) { + return _status; } - */ - Timestamp findCommonPoint(HostAndPort host, Timestamp start) { - // TODO: walk back in the oplog looking for a known/shared optime. - return Timestamp(); + _status = Status::OK(); + + log() << "starting cloning of all databases"; + // Schedule listDatabase command which will kick off the database cloner per result db. + Request listDBsReq(_source, "admin", BSON("listDatabases" << true)); + CBHStatus s = _exec->scheduleRemoteCommand( + listDBsReq, + stdx::bind(&DatabasesCloner::_onListDatabaseFinish, + this, + stdx::placeholders::_1)); + if (!s.isOK()) { + _setStatus(s); + _failed(); } - bool _didRollback(HostAndPort host) { - // TODO: rollback here, report if done. - return false; + _doNextActions(); + + return _status; + } + + void DatabasesCloner::_onListDatabaseFinish(const CommandCallbackData& cbd) { + invariant(_exec->isRunThread()); + const Status respStatus = cbd.response.getStatus(); + if (!respStatus.isOK()) { + // TODO: retry internally? + _setStatus(respStatus); + _doNextActions(); + return; } - } // namespace - Status InitialSyncImpl::start() { - // For testing, we may want to fail if we receive a getmore. - if (MONGO_FAIL_POINT(failInitialSyncWithBadHost)) { - _status = StatusWith<Timestamp>(ErrorCodes::InvalidSyncSource, "no sync source avail."); + const auto respBSON = cbd.response.getValue().data; + + // There should not be any cloners yet + invariant(_databaseCloners.size() == 0); + + const auto okElem = respBSON["ok"]; + if (okElem.trueValue()) { + const auto dbsElem = respBSON["databases"].Obj(); + BSONForEach(arrayElement, dbsElem) { + const BSONObj dbBSON = arrayElement.Obj(); + const std::string name = dbBSON["name"].str(); + ++_clonersActive; + std::shared_ptr<DatabaseCloner> dbCloner{nullptr}; + try { + dbCloner.reset(new DatabaseCloner( + _exec, + _source, + name, + BSONObj(), // do not filter database out. + [](const BSONObj&) { return true; }, // clone all dbs. + _storage, // use storage provided. + [](const Status& status, const NamespaceString& srcNss) { + if (status.isOK()) { + log() << "collection clone finished: " << srcNss; + } + else { + log() << "collection clone for '" + << srcNss << "' failed due to " + << status.toString(); + } + }, + [=](const Status& status) { + _onEachDBCloneFinish(status, name); + })); + } + catch (...) { + // error creating, fails below. + } + + Status s = dbCloner ? dbCloner->start() : Status(ErrorCodes::UnknownError, "Bad!"); + + if (!s.isOK()) { + std::string err = str::stream() << "could not create cloner for database: " + << name << " due to: " << s.toString(); + _setStatus(Status(ErrorCodes::InitialSyncFailure, err)); + error() << err; + break; // exit for_each loop + } + + // add cloner to list. + _databaseCloners.push_back(dbCloner); + } } else { - _status = Status::OK(); + _setStatus(Status(ErrorCodes::InitialSyncFailure, + "failed to clone databases due to failed server response.")); } - return _status.getStatus(); + + // Move on to the next steps in the process. + _doNextActions(); } + void DatabasesCloner::_onEachDBCloneFinish(const Status& status, const std::string name) { + auto clonersLeft = --_clonersActive; + + if (status.isOK()) { + log() << "database clone finished: " << name; + } + else { + log() << "database clone failed due to " + << status.toString(); + _setStatus(status); + } + + if (clonersLeft == 0) { + _active = false; + // All cloners are done, trigger event. + log() << "all database clones finished, calling _finishFn"; + _finishFn(_status); + } + + _doNextActions(); + } + + void DatabasesCloner::_doNextActions() { + // If we are no longer active or we had an error, stop doing more + if (!(_active && _status.isOK())) { + if (!_status.isOK()) { + // trigger failed state + _failed(); + } + return; + } + } + + void DatabasesCloner::_failed() { + // TODO: cancel outstanding work, like any cloners active + invariant(_finishFn); + _finishFn(_status); + } + + // Data Replicator DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec, ReplicationCoordinator* replCoord) : _opts(opts), _exec(exec), _replCoord(replCoord), - _state(DataReplicatiorState::Uninitialized) { - + _state(DataReplicatorState::Uninitialized), + _fetcherPaused(false), + _reporterPaused(false), + _applierActive(false), + _applierPaused(false), + _oplogBuffer(256*1024*1024, &getSize), // Limit buffer to 256MB + _doShutdown(false) { + // TODO: replace this with a method in the replication coordinator. + if (replCoord) { + _batchCompletedFn = [&] (const Timestamp& ts) { + OpTime ot(ts, 0); + _replCoord->setMyLastOptime(ot); + }; + } + else { + _batchCompletedFn = [] (const Timestamp& ts) { + }; + } } DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec) - : _opts(opts), - _exec(exec), - _state(DataReplicatiorState::Uninitialized) { + : DataReplicator(opts, exec, nullptr) { } -/* - Status DataReplicator::_run(run_func what) { - return callViaExecutor(_exec, - stdx::bind(what, - this, - stdx::placeholders::_1)); + + DataReplicator::~DataReplicator() { + DESTRUCTOR_GUARD( + _cancelAllHandles_inlock(); + _waitOnAll_inlock(); + ); } -*/ + Status DataReplicator::start() { - boost::unique_lock<boost::mutex> lk(_mutex); - if (_state != DataReplicatiorState::Uninitialized) { - // Error. + UniqueLock lk(_mutex); + if (_state != DataReplicatorState::Uninitialized) { + return Status(ErrorCodes::IllegalOperation, + str::stream() << "Already started in another state: " + << toString(_state)); } - _state = DataReplicatiorState::Steady; + + _state = DataReplicatorState::Steady; if (_replCoord) { - // TODO: Use chooseNewSyncSource? -this requires an active replCoord so not working in tests - _fetcher.reset(new Fetcher(_exec, - HostAndPort(), //_replCoord->chooseNewSyncSource(), - _opts.remoteOplogNS.db().toString(), - BSON("find" << _opts.remoteOplogNS), - stdx::bind(&DataReplicator::_onFetchFinish, - this, - stdx::placeholders::_1, - stdx::placeholders::_2))); + const auto lastOptime = _replCoord->getMyLastOptime(); + _fetcher.reset(new OplogFetcher(_exec, + lastOptime.getTimestamp(), + HostAndPort(), //TODO _replCoord->chooseNewSyncSource(), + _opts.remoteOplogNS, + stdx::bind(&DataReplicator::_onOplogFetchFinish, + this, + stdx::placeholders::_1, + stdx::placeholders::_2))); } else { - _fetcher.reset(new Fetcher(_exec, - _opts.syncSource, - _opts.remoteOplogNS.db().toString(), - BSON("find" << _opts.remoteOplogNS), - stdx::bind(&DataReplicator::_onFetchFinish, - this, - stdx::placeholders::_1, - stdx::placeholders::_2))); + // TODO: add query options await_data, oplog_replay + _fetcher.reset(new OplogFetcher(_exec, + _opts.startOptime, + _opts.syncSource, + _opts.remoteOplogNS, + stdx::bind(&DataReplicator::_onOplogFetchFinish, + this, + stdx::placeholders::_1, + stdx::placeholders::_2))); } @@ -177,14 +658,39 @@ namespace repl { } Status DataReplicator::pause() { - // TODO + _pauseApplier(); return Status::OK(); } + std::string DataReplicator::getDiagnosticString() const { + str::stream out; + out << "DataReplicator -" + << " opts: " << _opts.toString() + << " oplogFetcher: " << _fetcher->toString() + << " opsBuffered: " << _oplogBuffer.size() + << " state: " << toString(_state); + switch (_state) { + case DataReplicatorState::InitialSync: + out << " opsAppied: " << _initialSyncState->appliedOps + << " status: " << _initialSyncState->status.toString(); + break; + case DataReplicatorState::Steady: + // TODO: add more here + break; + case DataReplicatorState::Rollback: + // TODO: add more here + break; + default: + break; + } + + return out; + } + Status DataReplicator::resume(bool wait) { - StatusWith<Handle> handle = _exec->scheduleWork(stdx::bind(&DataReplicator::_resumeFinish, - this, - stdx::placeholders::_1)); + CBHStatus handle = _exec->scheduleWork(stdx::bind(&DataReplicator::_resumeFinish, + this, + stdx::placeholders::_1)); const Status status = handle.getStatus(); if (wait && status.isOK()) { _exec->wait(handle.getValue()); @@ -193,12 +699,19 @@ namespace repl { } void DataReplicator::_resumeFinish(CallbackData cbData) { + UniqueLock lk(_mutex); _fetcherPaused = _applierPaused = false; + lk.unlock(); + _doNextActions(); } void DataReplicator::_pauseApplier() { + LockGuard lk(_mutex); + if (_applier) + _applier->wait(); _applierPaused = true; + _applier.reset(); } Timestamp DataReplicator::_applyUntil(Timestamp untilTimestamp) { @@ -211,21 +724,23 @@ namespace repl { return _applyUntil(untilTimestamp); } - StatusWith<Timestamp> DataReplicator::flushAndPause() { + TimestampStatus DataReplicator::flushAndPause() { //_run(&_pauseApplier); - boost::unique_lock<boost::mutex> lk(_mutex); + UniqueLock lk(_mutex); if (_applierActive) { + _applierPaused = true; lk.unlock(); - _exec->wait(_applierHandle); + _applier->wait(); lk.lock(); } - return StatusWith<Timestamp>(_lastOptimeApplied); + return TimestampStatus(_lastTimestampApplied); } - void DataReplicator::_resetState(Timestamp lastAppliedOptime) { - boost::lock_guard<boost::mutex> lk(_mutex); - _lastOptimeApplied = _lastOptimeFetched = lastAppliedOptime; - } + void DataReplicator::_resetState_inlock(Timestamp lastAppliedOptime) { + invariant(!_anyActiveHandles_inlock()); + _lastTimestampApplied = _lastTimestampFetched = lastAppliedOptime; + _oplogBuffer.clear(); + } void DataReplicator::slavesHaveProgressed() { if (_reporter) { @@ -233,53 +748,134 @@ namespace repl { } } - StatusWith<Timestamp> DataReplicator::resync() { + void DataReplicator::_setInitialSyncStorageInterface(CollectionCloner::StorageInterface* si) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _storage = si; + if (_initialSyncState) { + _initialSyncState->dbsCloner.setStorageInterface(_storage); + } + } + + TimestampStatus DataReplicator::resync() { _shutdown(); // Drop databases and do initialSync(); - // TODO drop database - StatusWith<Timestamp> status = initialSync(); + CBHStatus cbh = _exec->scheduleDBWork([&](const CallbackData& cbData) { + _storage->dropUserDatabases(cbData.txn); + }); + + if (!cbh.isOK()) { + return TimestampStatus(cbh.getStatus()); + } + + _exec->wait(cbh.getValue()); + + TimestampStatus status = initialSync(); if (status.isOK()) { - _resetState(status.getValue()); + _resetState_inlock(status.getValue()); } return status; } - StatusWith<Timestamp> DataReplicator::initialSync() { - boost::lock_guard<boost::mutex> lk(_mutex); - if (_state != DataReplicatiorState::Uninitialized) { - if (_state == DataReplicatiorState::InitialSync) - return StatusWith<Timestamp>( - ErrorCodes::InvalidRoleModification, - (str::stream() << "Already doing initial sync;try resync")); + TimestampStatus DataReplicator::initialSync() { + Timer t; + UniqueLock lk(_mutex); + if (_state != DataReplicatorState::Uninitialized) { + if (_state == DataReplicatorState::InitialSync) + return TimestampStatus(ErrorCodes::InvalidRoleModification, + (str::stream() << "Already doing initial sync;try resync")); else { - return StatusWith<Timestamp>( - ErrorCodes::AlreadyInitialized, - (str::stream() << "Cannot do initial sync in " - << toString(_state))); + return TimestampStatus(ErrorCodes::AlreadyInitialized, + (str::stream() << "Cannot do initial sync in " + << toString(_state) << " state.")); } } - _state = DataReplicatiorState::InitialSync; + _state = DataReplicatorState::InitialSync; + + // The reporter is paused for the duration of the initial sync, so cancel just in case. + if (_reporter) { + _reporter->cancel(); + } + _reporterPaused = true; + _applierPaused = true; + + // TODO: set minvalid doc initial sync state. + const int maxFailedAttempts = 10; int failedAttempts = 0; + Status attemptErrorStatus(Status::OK()); while (failedAttempts < maxFailedAttempts) { - _initialSync.reset(new InitialSyncImpl(_exec)); - _initialSync->start(); - _initialSync->wait(); - Status s = _initialSync->getStatus().getStatus(); + // For testing, we may want to fail if we receive a getmore. + if (MONGO_FAIL_POINT(failInitialSyncWithBadHost)) { + attemptErrorStatus = Status(ErrorCodes::InvalidSyncSource, "no sync source avail."); + } - if (s.isOK()) { - // we are done :) - break; + Event initialSyncFinishEvent; + if (attemptErrorStatus.isOK() && _syncSource.empty()) { + _syncSource = _replCoord->chooseNewSyncSource(); + } + else if(attemptErrorStatus.isOK()) { + StatusWith<Event> status = _exec->makeEvent(); + if (!status.isOK()) { + attemptErrorStatus = status.getStatus(); + } else { + initialSyncFinishEvent = status.getValue(); + } + } + + if (attemptErrorStatus.isOK()) { + invariant(initialSyncFinishEvent.isValid()); + _initialSyncState.reset(new InitialSyncState( + DatabasesCloner(_exec, + _syncSource, + stdx::bind(&DataReplicator::_onDataClonerFinish, + this, + stdx::placeholders::_1)), + initialSyncFinishEvent)); + + _initialSyncState->dbsCloner.setStorageInterface(_storage); + const NamespaceString ns(_opts.remoteOplogNS); + TimestampStatus tsStatus = _initialSyncState->getLatestOplogTimestamp( + _exec, + _syncSource, + ns); + attemptErrorStatus = tsStatus.getStatus(); + if (attemptErrorStatus.isOK()) { + _initialSyncState->beginTimestamp = tsStatus.getValue(); + _fetcher.reset(new OplogFetcher(_exec, + _initialSyncState->beginTimestamp, + _syncSource, + _opts.remoteOplogNS, + stdx::bind(&DataReplicator::_onOplogFetchFinish, + this, + stdx::placeholders::_1, + stdx::placeholders::_2))); + _scheduleFetch_inlock(); + lk.unlock(); + _initialSyncState->dbsCloner.start(); // When the cloner is done applier starts. + invariant(_initialSyncState->finishEvent.isValid()); + _exec->waitForEvent(_initialSyncState->finishEvent); + attemptErrorStatus = _initialSyncState->status; + + // Re-lock DataReplicator Internals + lk.lock(); + } + } + + if (attemptErrorStatus.isOK()) { + break; // success } ++failedAttempts; error() << "Initial sync attempt failed -- attempts left: " << (maxFailedAttempts - failedAttempts) << " cause: " - << s; - // TODO: uncomment - //sleepsecs(5); + << attemptErrorStatus; + + // Sleep for retry time + lk.unlock(); + sleepsecs(InitialSyncRetrySleepSecs); + lk.lock(); // No need to print a stack if (failedAttempts >= maxFailedAttempts) { @@ -289,19 +885,105 @@ namespace repl { return Status(ErrorCodes::InitialSyncFailure, err); } } - return _initialSync->getStatus(); + + // Success, cleanup + // TODO: re-enable, fetcher is blocking on wait (in _waitOnAll) + _cancelAllHandles_inlock(); + _waitOnAll_inlock(); + + _reporterPaused = false; + _fetcherPaused = false; + _fetcher.reset(nullptr); + _tmpFetcher.reset(nullptr); + _applierPaused = false; + _applier.reset(nullptr); + _applierActive = false; + _initialSyncState.reset(nullptr); + _oplogBuffer.clear(); + _resetState_inlock(_lastTimestampApplied); + + log() << "Initial sync took: " << t.millis() << " milliseconds."; + return TimestampStatus(_lastTimestampApplied); } - const bool DataReplicator::_anyActiveHandles() { - boost::lock_guard<boost::mutex> lk(_mutex); - return _fetcher->isActive() || _applierActive || _reporter->isActive(); + void DataReplicator::_onDataClonerFinish(const Status& status) { + log() << "data clone finished, status: " << status.toString(); + if (!status.isOK()) { + // Iniitial sync failed during cloning of databases + _initialSyncState->setStatus(status); + _exec->signalEvent(_initialSyncState->finishEvent); + return; + } + + BSONObj query = BSON("find" << _opts.remoteOplogNS.coll() << + "sort" << BSON ("$natural" << -1) << + "limit" << 1); + + TimestampStatus timestampStatus(ErrorCodes::BadValue, ""); + _tmpFetcher.reset(new QueryFetcher(_exec, + _syncSource, + _opts.remoteOplogNS, + query, + stdx::bind(&DataReplicator::_onApplierReadyStart, + this, + stdx::placeholders::_1, + stdx::placeholders::_2))); + Status s = _tmpFetcher->schedule(); + if (!s.isOK()) { + _initialSyncState->setStatus(s); + } + } + + void DataReplicator::_onApplierReadyStart(const BatchDataStatus& fetchResult, + NextAction* nextAction) { + invariant(_exec->isRunThread()); + // Data clone done, move onto apply. + TimestampStatus ts(ErrorCodes::OplogStartMissing, ""); + _initialSyncState->_setTimestampSatus(fetchResult, nextAction, &ts); + if (ts.isOK()) { + // TODO: set minvalid? + LockGuard lk(_mutex); + _initialSyncState->stopTimestamp = ts.getValue(); + if (_lastTimestampApplied < ts.getValue()) { + log() << "waiting for applier to run until ts: " << ts.getValue(); + } + invariant(_applierPaused); + _applierPaused = false; + _doNextActions_InitialSync_inlock(); + } + else { + _initialSyncState->setStatus(ts.getStatus()); + _doNextActions(); + } } - const void DataReplicator::_cancelAllHandles() { - boost::lock_guard<boost::mutex> lk(_mutex); - _fetcher->cancel(); - _exec->cancel(_applierHandle); - _reporter->cancel(); + bool DataReplicator::_anyActiveHandles_inlock() const { + return _applierActive || + (_fetcher && _fetcher->isActive()) || + (_initialSyncState && _initialSyncState->dbsCloner.isActive()) || + (_reporter && _reporter->isActive()); + } + + void DataReplicator::_cancelAllHandles_inlock() { + if (_fetcher) + _fetcher->cancel(); + if (_applier) + _applier->cancel(); + if (_reporter) + _reporter->cancel(); + if (_initialSyncState && _initialSyncState->dbsCloner.isActive()) + _initialSyncState->dbsCloner.cancel(); + } + + void DataReplicator::_waitOnAll_inlock() { + if (_fetcher) + _fetcher->wait(); + if (_applier) + _applier->wait(); + if (_reporter) + _reporter->wait(); + if (_initialSyncState) + _initialSyncState->dbsCloner.wait(); } void DataReplicator::_doNextActionsCB(CallbackData cbData) { @@ -315,9 +997,9 @@ namespace repl { // 3.) Steady (Replication) // Check for shutdown flag, signal event - boost::lock_guard<boost::mutex> lk(_mutex); + LockGuard lk(_mutex); if (_doShutdown) { - if(!_anyActiveHandles()) { + if(!_anyActiveHandles_inlock()) { _exec->signalEvent(_onShutdown); } return; @@ -325,13 +1007,13 @@ namespace repl { // Do work for the current state switch (_state) { - case DataReplicatiorState::Rollback: + case DataReplicatorState::Rollback: _doNextActions_Rollback_inlock(); break; - case DataReplicatiorState::InitialSync: + case DataReplicatorState::InitialSync: _doNextActions_InitialSync_inlock(); break; - case DataReplicatiorState::Steady: + case DataReplicatorState::Steady: _doNextActions_Steady_inlock(); break; default: @@ -343,21 +1025,32 @@ namespace repl { } void DataReplicator::_doNextActions_InitialSync_inlock() { - // TODO: check initial sync state and do next actions - // move from initial sync phase to initial sync phase via scheduled work in exec - - if (!_initialSync) { - // Error case?, reset to uninit'd - _state = DataReplicatiorState::Uninitialized; + if (!_initialSyncState) { + // TODO: Error case?, reset to uninit'd + _state = DataReplicatorState::Uninitialized; + log() << "_initialSyncState, so resetting state to Uninitialized"; return; } - if (!_initialSync->isActive()) { - if (!_initialSync->getStatus().isOK()) { + if (!_initialSyncState->dbsCloner.isActive()) { + if (!_initialSyncState->dbsCloner.getStatus().isOK()) { // TODO: Initial sync failed } else { - // TODO: success + if (!_lastTimestampApplied.isNull() && + _lastTimestampApplied >= _initialSyncState->stopTimestamp) { + invariant(_initialSyncState->finishEvent.isValid()); + log() << "Applier done, initial sync done, end timestamp: " + << _initialSyncState->stopTimestamp << " , last applier: " + << _lastTimestampApplied; + _state = DataReplicatorState::Uninitialized; + _initialSyncState->setStatus(Status::OK()); + _exec->signalEvent(_initialSyncState->finishEvent); + } + else { + // Run steady state events to fetch/apply. + _doNextActions_Steady_inlock(); + } } } } @@ -383,48 +1076,189 @@ namespace repl { } else { // Check if active fetch, if not start one if (!_fetcher->isActive()) { - _scheduleFetch(); + _scheduleFetch_inlock(); } } - // Check if active apply, if not start one - if (!_applierActive) { - _scheduleApplyBatch(); + // Check if no active apply and ops to apply + if (!_applierActive && _oplogBuffer.size()) { + _scheduleApplyBatch_inlock(); } - if (!_reporter || !_reporter->previousReturnStatus().isOK()) { + if (!_reporterPaused && (!_reporter || !_reporter->getStatus().isOK())) { // TODO get reporter in good shape _reporter.reset(new Reporter(_exec, _replCoord, HostAndPort())); } } - void DataReplicator::_onApplyBatchFinish(CallbackData cbData) { - _reporter->trigger(); - // TODO + Operations DataReplicator::_getNextApplierBatch_inlock() { + // Return a new batch of ops to apply. + // TODO: limit the batch like SyncTail::tryPopAndWaitForMore + Operations ops; + BSONObj op; + while(_oplogBuffer.tryPop(op)) { + ops.push_back(op); + } + return ops; + } + + void DataReplicator::_onApplyBatchFinish(const CallbackData& cbData, + const TimestampStatus& ts, + const Operations& ops, + const size_t numApplied) { + invariant(cbData.status.isOK()); + UniqueLock lk(_mutex); + _initialSyncState->appliedOps += numApplied; + if (!ts.isOK()) { + _handleFailedApplyBatch(ts, ops); + return; + } + + _applierActive = false; + _lastTimestampApplied = ts.getValue(); + lk.unlock(); + + if (_batchCompletedFn) { + _batchCompletedFn(ts.getValue()); + } + // TODO: move the reported to the replication coordinator and set _batchCompletedFn to a + // function in the replCoord. + if (_reporter) { + _reporter->trigger(); + } + + _doNextActions(); + } + + void DataReplicator::_handleFailedApplyBatch(const TimestampStatus& ts, const Operations& ops) { + switch (_state) { + case DataReplicatorState::InitialSync: + // TODO: fetch missing doc, and retry. + _scheduleApplyAfterFetch(ops); + break; + case DataReplicatorState::Rollback: + // TODO: nothing? + default: + // fatal + fassert(28666, ts.getStatus()); + } + } + + void DataReplicator::_scheduleApplyAfterFetch(const Operations& ops) { + ++_initialSyncState->fetchedMissingDocs; + // TODO: check collection.isCapped, like SyncTail::getMissingDoc + const BSONObj failedOplogEntry = *ops.begin(); + const BSONElement missingIdElem = failedOplogEntry.getFieldDotted("o2._id"); + const NamespaceString nss(ops.begin()->getField("ns").str()); + const BSONObj query = BSON("find" << nss.coll() << "query" << missingIdElem.wrap()); + _tmpFetcher.reset(new QueryFetcher(_exec, _syncSource, nss, query, + stdx::bind(&DataReplicator::_onMissingFetched, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + ops, + nss))); + Status s = _tmpFetcher->schedule(); + if (!s.isOK()) { + // record error and take next step based on it. + _initialSyncState->setStatus(s); + _doNextActions(); + } + } + + void DataReplicator::_onMissingFetched(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction, + const Operations& ops, + const NamespaceString nss) { + invariant(_exec->isRunThread()); + + if (!fetchResult.isOK()) { + // TODO: do retries on network issues, like SyncTail::getMissingDoc + _initialSyncState->setStatus(fetchResult.getStatus()); + _doNextActions(); + return; + } else if (!fetchResult.getValue().documents.size()) { + // TODO: skip apply for this doc, like multiInitialSyncApply? + _initialSyncState->setStatus(Status(ErrorCodes::InitialSyncFailure, + "missing doc not found")); + _doNextActions(); + return; + } + + const BSONObj missingDoc = *fetchResult.getValue().documents.begin(); + Status rs{Status::OK()}; + auto s = _exec->scheduleDBWork(([&](const CallbackData& cd) { + rs = _storage->insertMissingDoc(cd.txn, nss, missingDoc); + }), + nss, + MODE_IX); + if (!s.isOK()) { + _initialSyncState->setStatus(s); + _doNextActions(); + return; + } + + _exec->wait(s.getValue()); + if (!rs.isOK()) { + _initialSyncState->setStatus(rs); + _doNextActions(); + return; + } + + auto status = _scheduleApplyBatch_inlock(ops); + if (!status.isOK()) { + LockGuard lk(_mutex); + _initialSyncState->setStatus(status); + _exec->signalEvent(_initialSyncState->finishEvent); + } } Status DataReplicator::_scheduleApplyBatch() { - boost::lock_guard<boost::mutex> lk(_mutex); - if (!_applierActive) { - // TODO - _applierActive = true; - auto status = _exec->scheduleWork( - stdx::bind(&DataReplicator::_onApplyBatchFinish, - this, - stdx::placeholders::_1)); - if (!status.isOK()) { - return status.getStatus(); - } + LockGuard lk(_mutex); + return _scheduleApplyBatch_inlock(); + } - _applierHandle = status.getValue(); + Status DataReplicator::_scheduleApplyBatch_inlock() { + if (!_applierPaused && !_applierActive) { + _applierActive = true; + const Operations ops = _getNextApplierBatch_inlock(); + invariant(ops.size()); + invariant(_opts.applierFn); + invariant(!(_applier && _applier->isActive())); + return _scheduleApplyBatch_inlock(ops); } return Status::OK(); } + Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) { + auto lambda = [&] (const TimestampStatus& ts, const Operations& ops) { + CBHStatus status = _exec->scheduleWork(stdx::bind(&DataReplicator::_onApplyBatchFinish, + this, + stdx::placeholders::_1, + ts, + ops, + ops.size())); + if (!status.isOK()) { + LockGuard lk(_mutex); + _initialSyncState->setStatus(status); + _exec->signalEvent(_initialSyncState->finishEvent); + return; + } + // block until callback done. + _exec->wait(status.getValue()); + }; + + _applier.reset(new Applier(_exec, ops, _opts.applierFn, lambda)); + return _applier->start(); + } + Status DataReplicator::_scheduleFetch() { - boost::lock_guard<boost::mutex> lk(_mutex); + LockGuard lk(_mutex); + return _scheduleFetch_inlock(); + } + + Status DataReplicator::_scheduleFetch_inlock() { if (!_fetcher->isActive()) { - // TODO Status status = _fetcher->schedule(); if (!status.isOK()) { return status; @@ -445,13 +1279,9 @@ namespace repl { Status DataReplicator::_shutdown() { StatusWith<Event> eventStatus = _exec->makeEvent(); if (!eventStatus.isOK()) return eventStatus.getStatus(); - boost::unique_lock<boost::mutex> lk(_mutex); + UniqueLock lk(_mutex); _onShutdown = eventStatus.getValue(); - lk.unlock(); - - _cancelAllHandles(); - - lk.lock(); + _cancelAllHandles_inlock(); _doShutdown = true; lk.unlock(); @@ -481,38 +1311,64 @@ namespace repl { } } - void DataReplicator::_onFetchFinish(const StatusWith<Fetcher::BatchData>& fetchResult, - Fetcher::NextAction* nextAction) { + void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::BatchData>& fetchResult, + Fetcher::NextAction* nextAction) { + invariant(_exec->isRunThread()); const Status status = fetchResult.getStatus(); if (status.code() == ErrorCodes::CallbackCanceled) return; if (status.isOK()) { const auto docs = fetchResult.getValue().documents; - _oplogBuffer.insert(_oplogBuffer.end(), docs.begin(), docs.end()); - if (*nextAction == Fetcher::NextAction::kNoAction) { - // TODO: create new fetcher?, with new query from where we left off + if (docs.begin() != docs.end()) { + LockGuard lk(_mutex); + std::for_each(docs.cbegin(), + docs.cend(), + [&](const BSONObj& doc) { + _oplogBuffer.push(doc); + }); + auto doc = docs.rbegin(); + BSONElement tsElem(doc->getField("ts")); + while(tsElem.eoo() || doc != docs.rend()) { + tsElem = (doc++)->getField("ts"); + } + + if (!tsElem.eoo()) { + _lastTimestampFetched = tsElem.timestamp(); + } else { + warning() << + "Did not find a 'ts' timestamp field in any of the fetched documents"; + } } - } - else { - // Error, decide what to do... + if (*nextAction == Fetcher::NextAction::kNoAction) { + // TODO: create new fetcher?, with new query from where we left off -- d'tor fetcher - if (status.code() == ErrorCodes::InvalidSyncSource) { - // Error, sync source - Date_t until{}; - _replCoord->blacklistSyncSource(_syncSource, until); - _syncSource = HostAndPort(); } + } - if (status.code() == ErrorCodes::OplogStartMissing) { - // possible rollback - bool didRollback = _didRollback(_syncSource); - if (!didRollback) { - _replCoord->setFollowerMode(MemberState::RS_RECOVERING); // TODO too stale - } - else { - // TODO: cleanup state/restart -- set _lastApplied, and other stuff + if (!status.isOK()) { + // Got an error, now decide what to do... + switch (status.code()) { + case ErrorCodes::OplogStartMissing: { + // possible rollback + bool didRollback = _didRollback(_syncSource); + if (!didRollback) { + _replCoord->setFollowerMode(MemberState::RS_RECOVERING); // TODO too stale + } + else { + // TODO: cleanup state/restart -- set _lastApplied, and other stuff + } + break; } + case ErrorCodes::InvalidSyncSource: + // Error, sync source + // fallthrough + default: + // TODO: SERVER-18034 -- real blacklist timeout time + Date_t until{}; + LockGuard lk(_mutex); + _replCoord->blacklistSyncSource(_syncSource, until); + _syncSource = HostAndPort(); } } @@ -520,4 +1376,3 @@ namespace repl { } } // namespace repl } // namespace mongo - diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index edcf274dbc7..0573b25688b 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -29,10 +29,7 @@ #pragma once -#include <boost/scoped_ptr.hpp> #include <boost/thread.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> #include <vector> #include "mongo/platform/basic.h" @@ -41,24 +38,40 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/applier.h" +#include "mongo/db/repl/collection_cloner.h" +#include "mongo/db/repl/database_cloner.h" #include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/reporter.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/queue.h" namespace mongo { namespace repl { -namespace { - typedef ReplicationExecutor::CallbackHandle Handle; - typedef ReplicationExecutor::EventHandle Event; - typedef ReplicationExecutor::CallbackData CallbackData; - typedef ReplicationExecutor::RemoteCommandCallbackData CommandCallbackData; +using Operations = Applier::Operations; +using BatchDataStatus = StatusWith<Fetcher::BatchData>; +using CallbackData = ReplicationExecutor::CallbackData; +using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>; +using CommandCallbackData = ReplicationExecutor::RemoteCommandCallbackData; +using Event = ReplicationExecutor::EventHandle; +using Handle = ReplicationExecutor::CallbackHandle; +using LockGuard = stdx::lock_guard<stdx::mutex>; +using NextAction = Fetcher::NextAction; +using Request = RemoteCommandRequest; +using Response = RemoteCommandResponse; +using TimestampStatus = StatusWith<Timestamp>; +using UniqueLock = stdx::unique_lock<stdx::mutex>; + +class QueryFetcher; +class OplogFetcher; +struct InitialSyncState; + -} // namespace /** State for decision tree */ -enum class DataReplicatiorState { +enum class DataReplicatorState { Steady, // Default InitialSync, Rollback, @@ -66,7 +79,7 @@ enum class DataReplicatiorState { }; // TBD -- ignore for now -enum class DataReplicatiorScope { +enum class DataReplicatorScope { ReplicateAll, ReplicateDB, ReplicateCollection @@ -78,37 +91,25 @@ struct DataReplicatorOptions { NamespaceString remoteOplogNS = NamespaceString("local.oplog.rs"); // TBD -- ignore below for now - DataReplicatiorScope scope = DataReplicatiorScope::ReplicateAll; + DataReplicatorScope scope = DataReplicatorScope::ReplicateAll; std::string scopeNS; BSONObj filterCriteria; HostAndPort syncSource; // for use without replCoord -- maybe some kind of rsMonitor/interface -}; -// TODO: Break out: or at least move body to cpp -class InitialSyncImpl { -public: - InitialSyncImpl(ReplicationExecutor* exec) - : _status(StatusWith<Timestamp>(Timestamp())), - _exec(exec) { + // TODO: replace with real applier function + Applier::ApplyOperationFn applierFn = [] (OperationContext*, const BSONObj&) -> Status { + return Status::OK(); }; - Status start(); - - void wait() { - // TODO - }; - - bool isActive() { return _finishEvent.isValid(); }; - - StatusWith<Timestamp> getStatus() {return _status;} -private: - StatusWith<Timestamp> _status; - ReplicationExecutor* _exec; - Event _finishEvent; + std::string toString() const { + return str::stream() << "DataReplicatorOptions -- " + << " localOplogNs: " << localOplogNS.toString() + << " remoteOplogNS: " << remoteOplogNS.toString() + << " syncSource: " << syncSource.toString() + << " startOptime: " << startOptime.toString(); + } }; -class Applier {}; - /** * The data replicator provides services to keep collection in sync by replicating * changes via an oplog source to the local system storage. @@ -118,6 +119,9 @@ class Applier {}; */ class DataReplicator { public: + /** Function to call when a batch is applied. */ + using OnBatchCompleteFn = stdx::function<void (const Timestamp&)>; + DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec, ReplicationCoordinator* replCoord); @@ -127,6 +131,8 @@ public: DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec); + virtual ~DataReplicator(); + Status start(); Status shutdown(); @@ -137,19 +143,24 @@ public: Status pause(); // Pauses replication and waits to return until all un-applied ops have been applied - StatusWith<Timestamp> flushAndPause(); + TimestampStatus flushAndPause(); // Called when a slave has progressed to a new oplog position void slavesHaveProgressed(); // just like initialSync but can be called anytime. - StatusWith<Timestamp> resync(); + TimestampStatus resync(); // Don't use above methods before these - StatusWith<Timestamp> initialSync(); + TimestampStatus initialSync(); + + std::string getDiagnosticString() const; // For testing only - void _resetState(Timestamp lastAppliedOptime); + void _resetState_inlock(Timestamp lastAppliedOptime); + void __setSourceForTesting(HostAndPort src) { _syncSource = src; } + void _setInitialSyncStorageInterface(CollectionCloner::StorageInterface* si); + private: // Run a member function in the executor, waiting for it to finish. @@ -157,9 +168,8 @@ private: // Only executed via executor void _resumeFinish(CallbackData cbData); - void _onFetchFinish(const StatusWith<Fetcher::BatchData>& fetchResult, - Fetcher::NextAction* nextAction); - void _onApplyBatchFinish(CallbackData cbData); + void _onOplogFetchFinish(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction); void _doNextActionsCB(CallbackData cbData); void _doNextActions(); void _doNextActions_InitialSync_inlock(); @@ -171,15 +181,37 @@ private: Timestamp _applyUntil(Timestamp); void _pauseApplier(); + Operations _getNextApplierBatch_inlock(); + void _onApplyBatchFinish(const CallbackData&, + const TimestampStatus&, + const Operations&, + const size_t numApplied); + void _handleFailedApplyBatch(const TimestampStatus&, const Operations&); + // Fetches the last doc from the first operation, and reschedules the apply for the ops. + void _scheduleApplyAfterFetch(const Operations&); + void _onMissingFetched(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction, + const Operations& ops, + const NamespaceString nss); + // returns true if a rollback is needed bool _needToRollback(HostAndPort source, Timestamp lastApplied); + void _onDataClonerFinish(const Status& status); + // Called after _onDataClonerFinish when the new Timestamp is avail, to use for minvalid + void _onApplierReadyStart(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction); + Status _scheduleApplyBatch(); + Status _scheduleApplyBatch_inlock(); + Status _scheduleApplyBatch_inlock(const Operations& ops); Status _scheduleFetch(); + Status _scheduleFetch_inlock(); Status _scheduleReport(); - const void _cancelAllHandles(); - const bool _anyActiveHandles(); + void _cancelAllHandles_inlock(); + void _waitOnAll_inlock(); + bool _anyActiveHandles_inlock() const; Status _shutdown(); void _changeStateIfNeeded(); @@ -204,28 +236,32 @@ private: // (I) Independently synchronized, see member variable comment. // Protects member data of this ReplicationCoordinator. - mutable boost::mutex _mutex; // (S) - DataReplicatiorState _state; // (MX) + mutable stdx::mutex _mutex; // (S) + DataReplicatorState _state; // (MX) + + // initial sync state + std::unique_ptr<InitialSyncState> _initialSyncState; // (M) + CollectionCloner::StorageInterface* _storage; // (M) // set during scheduling and onFinish bool _fetcherPaused; // (X) - boost::scoped_ptr<Fetcher> _fetcher; // (S) + std::unique_ptr<OplogFetcher> _fetcher; // (S) + std::unique_ptr<QueryFetcher> _tmpFetcher; // (S) - bool _reporterActive; // (M) + bool _reporterPaused; // (M) Handle _reporterHandle; // (M) - boost::scoped_ptr<Reporter> _reporter; // (M) + std::unique_ptr<Reporter> _reporter; // (M) bool _applierActive; // (M) bool _applierPaused; // (X) - Handle _applierHandle; // (M) - boost::scoped_ptr<Applier> _applier; // (M) + std::unique_ptr<Applier> _applier; // (M) + OnBatchCompleteFn _batchCompletedFn; // (M) - boost::scoped_ptr<InitialSyncImpl> _initialSync; // (M) HostAndPort _syncSource; // (M) - Timestamp _lastOptimeFetched; // (MX) - Timestamp _lastOptimeApplied; // (MX) - std::vector<BSONObj> _oplogBuffer; // (M) + Timestamp _lastTimestampFetched; // (MX) + Timestamp _lastTimestampApplied; // (MX) + BlockingQueue<BSONObj> _oplogBuffer; // (M) // Shutdown bool _doShutdown; // (M) @@ -233,9 +269,6 @@ private: // Rollback stuff Timestamp _rollbackCommonOptime; // (MX) - - - }; } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 6aef8263679..818fe559bec 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -26,22 +26,29 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + #include "mongo/platform/basic.h" #include <memory> #include "mongo/db/jsobj.h" +#include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/data_replicator.h" #include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator_external_state_mock.h" -#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_executor_test_fixture.h" -#include "mongo/db/repl/topology_coordinator.h" +#include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/concurrency/thread_name.h" +#include "mongo/util/log.h" #include "mongo/unittest/unittest.h" @@ -49,7 +56,16 @@ namespace { using namespace mongo; using namespace mongo::repl; using executor::NetworkInterfaceMock; + using LockGuard = stdx::lock_guard<stdx::mutex>; + using UniqueLock = stdx::unique_lock<stdx::mutex>; + using mutex = stdx::mutex; + ReplicaSetConfig assertMakeRSConfig(const BSONObj& configBson) { + ReplicaSetConfig config; + ASSERT_OK(config.initialize(configBson)); + ASSERT_OK(config.validate()); + return config; + } const HostAndPort target("localhost", -1); class DataReplicatorTest : public ReplicationExecutorTest { @@ -62,6 +78,7 @@ namespace { // PRNG seed for tests. const int64_t seed = 0; + _settings.replSet = "foo"; // We are a replica set :) ReplicationExecutor* exec = &(getExecutor()); _topo = new TopologyCoordinatorImpl(Seconds(0)); _externalState = new ReplicationCoordinatorExternalStateMock; @@ -71,8 +88,11 @@ namespace { exec, seed)); launchExecutorThread(); - _dr.reset(new DataReplicator(DataReplicatorOptions(), exec, _repl.get())); + createDataReplicator(DataReplicatorOptions{}); } + + void postExecutorThreadLaunch() override {}; + void tearDown() override { ReplicationExecutorTest::tearDown(); // Executor may still invoke callback before shutting down. @@ -82,6 +102,12 @@ namespace { // clear/reset state } + + void createDataReplicator(DataReplicatorOptions opts) { + _dr.reset(new DataReplicator(opts, &(getExecutor()), _repl.get())); + _dr->__setSourceForTesting(target); + } + void scheduleNetworkResponse(const BSONObj& obj) { NetworkInterfaceMock* net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); @@ -114,11 +140,12 @@ namespace { } DataReplicator& getDR() { return *_dr; } + TopologyCoordinatorImpl& getTopo() { return *_topo; } + ReplicationCoordinatorImpl& getRepl() { return *_repl; } - protected: - std::unique_ptr<DataReplicator> _dr; private: + std::unique_ptr<DataReplicator> _dr; boost::scoped_ptr<ReplicationCoordinatorImpl> _repl; // Owned by ReplicationCoordinatorImpl TopologyCoordinatorImpl* _topo; @@ -131,7 +158,10 @@ namespace { }; TEST_F(DataReplicatorTest, CreateDestroy) { + } + TEST_F(DataReplicatorTest, StartOk) { + ASSERT_EQ(getDR().start().code(), ErrorCodes::OK); } TEST_F(DataReplicatorTest, CannotInitialSyncAfterStart) { @@ -139,15 +169,338 @@ namespace { ASSERT_EQ(getDR().initialSync(), ErrorCodes::AlreadyInitialized); } - TEST_F(DataReplicatorTest, InitialSyncFailpoint) { + // Used to run a Initial Sync in a separate thread, to avoid blocking test execution. + class InitialSyncBackgroundRunner { + public: + + InitialSyncBackgroundRunner(DataReplicator* dr) : + _dr(dr), + _result(Status(ErrorCodes::BadValue, "failed to set status")) {} + + // Could block if _sgr has not finished + TimestampStatus getResult() { + _thread->join(); + return _result; + } + + void run() { + _thread.reset(new boost::thread(stdx::bind(&InitialSyncBackgroundRunner::_run, this))); + sleepmillis(2); // sleep to let new thread run initialSync so it schedules work + } + + private: + + void _run() { + setThreadName("InitialSyncRunner"); + log() << "starting initial sync"; + _result = _dr->initialSync(); // blocking + } + + DataReplicator* _dr; + TimestampStatus _result; + boost::scoped_ptr<boost::thread> _thread; + }; + + class InitialSyncTest : public DataReplicatorTest { + public: + InitialSyncTest() + : _insertCollectionFn([&](OperationContext* txn, + const NamespaceString& theNss, + const std::vector<BSONObj>& theDocuments) { + log() << "insertDoc for " << theNss.toString(); + LockGuard lk(_collectionCountMutex); + ++(_collectionCounts[theNss.toString()]); + return Status::OK(); + }), + _beginCollectionFn([&](OperationContext* txn, + const NamespaceString& theNss, + const CollectionOptions& theOptions, + const std::vector<BSONObj>& theIndexSpecs) { + log() << "beginCollection for " << theNss.toString(); + LockGuard lk(_collectionCountMutex); + _collectionCounts[theNss.toString()] = 0; + return Status::OK(); + }) {}; + + protected: + + void setStorageFuncs(ClonerStorageInterfaceMock::InsertCollectionFn ins, + ClonerStorageInterfaceMock::BeginCollectionFn beg) { + _insertCollectionFn = ins; + _beginCollectionFn = beg; + } + + void setResponses(std::vector<BSONObj> resps) { + _responses = resps; + } + + void startSync() { + DataReplicator* dr = &(getDR()); + + _storage.beginCollectionFn = _beginCollectionFn; + _storage.insertDocumentsFn = _insertCollectionFn; + _storage.insertMissingDocFn = [&] (OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) { + return Status::OK(); + }; + + dr->_setInitialSyncStorageInterface(&_storage); + _isbr.reset(new InitialSyncBackgroundRunner(dr)); + _isbr->run(); + } + + + void playResponses() { + // TODO: Handle network responses + NetworkInterfaceMock* net = getNet(); + int processedRequests(0); + const int expectedResponses(_responses.size()); + + //counter for oplog entries + int c(0); + while (true) { + net->enterNetwork(); + if (!net->hasReadyRequests() && processedRequests < expectedResponses) { + net->exitNetwork(); + continue; + } + NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + + const BSONObj reqBSON = noi->getRequest().cmdObj; + const BSONElement cmdElem = reqBSON.firstElement(); + const bool isGetMore = + cmdElem.fieldNameStringData().equalCaseInsensitive("getmore"); + const long long cursorId = cmdElem.numberLong(); + if (isGetMore && cursorId == 1LL) { + // process getmore requests from the oplog fetcher + auto respBSON = fromjson( + str::stream() << "{ok:1, cursor:{id:1, ns:'local.oplog.rs', nextBatch:[" + "{ts:Timestamp(" << ++c << ",1), h:1, ns:'test.a', v:2, op:'u', o2:{_id:" + << c << "}, o:{$set:{a:1}}}" + "]}}"); + net->scheduleResponse(noi, + net->now(), + ResponseStatus(RemoteCommandResponse(respBSON, + Milliseconds(10)))); + net->runReadyNetworkOperations(); + net->exitNetwork(); + continue; + } + else if (isGetMore) { + // TODO: return more data + } + + // process fixed set of responses + log() << "processing network request: " + << noi->getRequest().dbname << "." << noi->getRequest().cmdObj.toString(); + net->scheduleResponse(noi, + net->now(), + ResponseStatus( + RemoteCommandResponse(_responses[processedRequests], + Milliseconds(10)))); + net->runReadyNetworkOperations(); + net->exitNetwork(); + if (++processedRequests >= expectedResponses) { + log() << "done processing expected requests "; + break; // once we have processed all requests, continue; + } + } + + net->enterNetwork(); + if (net->hasReadyRequests()) { + log() << "There are unexpected requests left"; + log() << "next cmd: " << net->getNextReadyRequest()->getRequest().cmdObj.toString(); + ASSERT_FALSE(net->hasReadyRequests()); + } + net->exitNetwork(); + } + + void verifySync(Status s = Status::OK()) { + verifySync(_isbr->getResult().getStatus().code()); + } + + void verifySync(ErrorCodes::Error code) { + // Check result + ASSERT_EQ(_isbr->getResult().getStatus().code(), code) << "status codes differ"; + } + + std::map<std::string, int> getLocalCollectionCounts() { + return _collectionCounts; + } + + private: + ClonerStorageInterfaceMock::InsertCollectionFn _insertCollectionFn; + ClonerStorageInterfaceMock::BeginCollectionFn _beginCollectionFn; + std::vector<BSONObj> _responses; + std::unique_ptr<InitialSyncBackgroundRunner> _isbr; + std::map<std::string, int> _collectionCounts; // counts of inserts during cloning + mutex _collectionCountMutex; // used to protect the collectionCount map + ClonerStorageInterfaceMock _storage; + }; + + TEST_F(InitialSyncTest, Complete) { + /** + * Initial Sync will issue these query/commands + * - startTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"] + * - listDatabases (foreach db do below) + * -- cloneDatabase (see DatabaseCloner tests). + * - endTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"] + * - ops = oplog.rs->find({ts:{$gte: startTS}}) (foreach op) + * -- if local doc is missing, getCollection(op.ns).findOne(_id:op.o2._id) + * - if any retries were done in the previous loop, endTS query again for minvalid + * + */ + + const std::vector<BSONObj> responses = { + // get latest oplog ts + fromjson( + "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" + "]}}"), + // oplog fetcher find + fromjson( + "{ok:1, cursor:{id:1, ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" + "]}}"), +// Clone Start + // listDatabases + fromjson("{ok:1, databases:[{name:'a'}]}"), + // listCollections for "a" + fromjson( + "{ok:1, cursor:{id:0, ns:'a.$cmd.listCollections', firstBatch:[" + "{name:'a', options:{}} " + "]}}"), + // listIndexes:a + fromjson( + "{ok:1, cursor:{id:0, ns:'a.$cmd.listIndexes.a', firstBatch:[" + "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}" + "]}}"), + // find:a + fromjson( + "{ok:1, cursor:{id:0, ns:'a.a', firstBatch:[" + "{_id:1, a:1} " + "]}}"), +// Clone Done + // get latest oplog ts + fromjson( + "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}" + "]}}"), +// Applier starts ... + }; + startSync(); + setResponses(responses); + playResponses(); + verifySync(); + } + + TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) { + + DataReplicatorOptions opts; + int applyCounter{0}; + opts.applierFn = [&] (OperationContext* txn, const BSONObj& op) { + if (++applyCounter == 1) { + return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); + } + return Status::OK(); + }; + createDataReplicator(opts); + + const std::vector<BSONObj> responses = { + // get latest oplog ts + fromjson( + "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" + "]}}"), + // oplog fetcher find + fromjson( + "{ok:1, cursor:{id:1, ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'u', o2:{_id:1}, o:{$set:{a:1}}}" + "]}}"), +// Clone Start + // listDatabases + fromjson("{ok:1, databases:[{name:'a'}]}"), + // listCollections for "a" + fromjson( + "{ok:1, cursor:{id:0, ns:'a.$cmd.listCollections', firstBatch:[" + "{name:'a', options:{}} " + "]}}"), + // listIndexes:a + fromjson( + "{ok:1, cursor:{id:0, ns:'a.$cmd.listIndexes.a', firstBatch:[" + "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}" + "]}}"), + // find:a -- empty + fromjson( + "{ok:1, cursor:{id:0, ns:'a.a', firstBatch:[]}}"), +// Clone Done + // get latest oplog ts + fromjson( + "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}" + "]}}"), +// Applier starts ... + // missing doc fetch -- find:a {_id:1} + fromjson( + "{ok:1, cursor:{id:0, ns:'a.a', firstBatch:[" + "{_id:1, a:1} " + "]}}"), + }; + startSync(); + setResponses(responses); + playResponses(); + verifySync(ErrorCodes::OK); + } + + TEST_F(InitialSyncTest, Failpoint) { mongo::getGlobalFailPointRegistry()-> getFailPoint("failInitialSyncWithBadHost")-> setMode(FailPoint::alwaysOn); - ASSERT_EQ(getDR().initialSync(), ErrorCodes::InitialSyncFailure); + BSONObj configObj = BSON("_id" << "mySet" << + "version" << 1 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345") + << BSON("_id" << 2 << "host" << "node2:12345") + << BSON("_id" << 3 << "host" << "node3:12345") + )); + + ReplicaSetConfig config = assertMakeRSConfig(configObj); + Timestamp time1(100, 1); + OpTime opTime1(time1, OpTime::kDefaultTerm); + getTopo().updateConfig(config, 0, getNet()->now(), opTime1); + getRepl().setMyLastOptime(opTime1); + ASSERT(getRepl().setFollowerMode(MemberState::RS_SECONDARY)); + + DataReplicator* dr = &(getDR()); + InitialSyncBackgroundRunner isbr(dr); + isbr.run(); + ASSERT_EQ(isbr.getResult().getStatus().code(), ErrorCodes::InitialSyncFailure); mongo::getGlobalFailPointRegistry()-> getFailPoint("failInitialSyncWithBadHost")-> setMode(FailPoint::off); } + + TEST_F(InitialSyncTest, FailsOnClone) { + const std::vector<BSONObj> responses = { + // get latest oplog ts + fromjson( + "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" + "]}}"), + // oplog fetcher find + fromjson( + "{ok:1, cursor:{id:1, ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" + "]}}"), +// Clone Start + // listDatabases + fromjson("{ok:0}") + }; + startSync(); + setResponses(responses); + playResponses(); + verifySync(ErrorCodes::InitialSyncFailure); + } } diff --git a/src/mongo/db/repl/fetcher.cpp b/src/mongo/db/repl/fetcher.cpp index 10eb8ae3149..f5436630d75 100644 --- a/src/mongo/db/repl/fetcher.cpp +++ b/src/mongo/db/repl/fetcher.cpp @@ -78,10 +78,12 @@ namespace { "cursor response must contain '" << kCursorFieldName << "." << kCursorIdFieldName << "' field: " << obj); } - if (cursorIdElement.type() != mongo::NumberLong) { + if (!(cursorIdElement.type() == mongo::NumberLong || + cursorIdElement.type() == mongo::NumberInt)) { return Status(ErrorCodes::FailedToParse, str::stream() << "'" << kCursorFieldName << "." << kCursorIdFieldName << - "' field must be a number of type 'long': " << obj); + "' field must be a integral number of type 'int' or 'long' but was a '" + << typeName(cursorIdElement.type()) << "': " << obj); } batchData->cursorId = cursorIdElement.numberLong(); diff --git a/src/mongo/db/repl/fetcher.h b/src/mongo/db/repl/fetcher.h index efacca9f1ad..474e43afef6 100644 --- a/src/mongo/db/repl/fetcher.h +++ b/src/mongo/db/repl/fetcher.h @@ -70,7 +70,7 @@ namespace repl { /** * Represents next steps of fetcher. */ - enum class NextAction { + enum class NextAction : int { kInvalid=0, kNoAction=1, kGetMore=2 diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp index 85a693dd48f..4672dd9759d 100644 --- a/src/mongo/db/repl/fetcher_test.cpp +++ b/src/mongo/db/repl/fetcher_test.cpp @@ -293,13 +293,14 @@ namespace { TEST_F(FetcherTest, CursorIdNotLongNumber) { ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 123 << + processNetworkResponse(BSON("cursor" << BSON("id" << 123.1 << "ns" << "db.coll" << "firstBatch" << BSONArray()) << "ok" << 1)); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), - "'cursor.id' field must be a number of type 'long'"); + "'cursor.id' field must be"); + ASSERT_EQ((int)Fetcher::NextAction::kInvalid, (int)nextAction); } TEST_F(FetcherTest, NamespaceFieldMissing) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index e3ea081940a..8f878067129 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -77,7 +77,6 @@ namespace mongo { namespace repl { namespace { - typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus; using executor::NetworkInterface; void lockAndCall(boost::unique_lock<boost::mutex>* lk, const stdx::function<void ()>& fn) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index e55ab6d656d..413445b5e1a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -57,7 +57,6 @@ namespace repl { namespace { - typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus; typedef ReplicationExecutor::CallbackHandle CBHandle; } //namespace diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index 72c611b600a..2a8e34fc915 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -31,6 +31,7 @@ #include "mongo/db/repl/replication_executor.h" #include <limits> +#include <thread> #include "mongo/db/repl/database_task.h" #include "mongo/db/repl/storage_interface.h" @@ -57,7 +58,7 @@ namespace { _inShutdown(false), _dblockWorkers(threadpool::ThreadPool::DoNotStartThreadsTag(), 3, - "replCallbackWithGlobalLock-"), + "replExecDBWorker-"), _dblockTaskRunner( &_dblockWorkers, stdx::bind(&StorageInterface::createOperationContext, storageInterface)), @@ -94,8 +95,13 @@ namespace { return _networkInterface->now(); } + bool ReplicationExecutor::isRunThread() const { + return (_runThreadId == std::this_thread::get_id()); + } + void ReplicationExecutor::run() { setThreadName("ReplicationExecutor"); + _runThreadId = std::this_thread::get_id(); _networkInterface->startup(); _dblockWorkers.startThreads(); std::pair<WorkItem, CallbackHandle> work; diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index b90d0e24133..14d9f50ccfb 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -32,6 +32,7 @@ #include <boost/thread/condition_variable.hpp> #include <boost/thread/mutex.hpp> #include <string> +#include <thread> #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" @@ -335,6 +336,11 @@ namespace repl { */ int64_t nextRandomInt64(int64_t limit); + /** + * Returns true if executing in the "run" thread, which should not block with IO + */ + bool isRunThread() const; + private: struct Event; struct WorkItem; @@ -448,6 +454,7 @@ namespace repl { TaskRunner _dblockTaskRunner; TaskRunner _dblockExclusiveLockTaskRunner; uint64_t _nextId; + std::thread::id _runThreadId; }; /** diff --git a/src/mongo/db/repl/replication_executor_test_fixture.cpp b/src/mongo/db/repl/replication_executor_test_fixture.cpp index 6efbe072d52..ff84a4de538 100644 --- a/src/mongo/db/repl/replication_executor_test_fixture.cpp +++ b/src/mongo/db/repl/replication_executor_test_fixture.cpp @@ -47,7 +47,11 @@ namespace { ASSERT(!_executorThread); _executorThread.reset( new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); - getNet()->enterNetwork(); + postExecutorThreadLaunch(); + } + + void ReplicationExecutorTest::postExecutorThreadLaunch() { + _net->enterNetwork(); } void ReplicationExecutorTest::joinExecutorThread() { diff --git a/src/mongo/db/repl/replication_executor_test_fixture.h b/src/mongo/db/repl/replication_executor_test_fixture.h index 479d799b990..a5a7ae6bf25 100644 --- a/src/mongo/db/repl/replication_executor_test_fixture.h +++ b/src/mongo/db/repl/replication_executor_test_fixture.h @@ -60,6 +60,11 @@ namespace repl { void launchExecutorThread(); /** + * Anything that needs to be done after launchExecutorThread should go in here. + */ + virtual void postExecutorThreadLaunch(); + + /** * Waits for background ReplicationExecutor to stop running. * * The executor should be shutdown prior to calling this function diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp index a3120adf01c..325374ae76b 100644 --- a/src/mongo/db/repl/reporter.cpp +++ b/src/mongo/db/repl/reporter.cpp @@ -75,6 +75,13 @@ namespace repl { _executor->cancel(_remoteCommandCallbackHandle); } + void Reporter::wait() { + boost::unique_lock<boost::mutex> lk(_mutex); + if(_remoteCommandCallbackHandle.isValid()) { + _executor->wait(_remoteCommandCallbackHandle); + } + } + Status Reporter::trigger() { boost::lock_guard<boost::mutex> lk(_mutex); return _schedule_inlock(); @@ -128,7 +135,7 @@ namespace repl { } } - Status Reporter::previousReturnStatus() const { + Status Reporter::getStatus() const { boost::lock_guard<boost::mutex> lk(_mutex); return _status; } diff --git a/src/mongo/db/repl/reporter.h b/src/mongo/db/repl/reporter.h index c96a896cd88..36caa78fe6b 100644 --- a/src/mongo/db/repl/reporter.h +++ b/src/mongo/db/repl/reporter.h @@ -69,6 +69,12 @@ namespace repl { void cancel(); /** + * Waits for last/current executor handle to finish. + * Returns immediately if the handle is invalid. + */ + void wait(); + + /** * Signals to the Reporter that there is new information to be sent to the "_target" server. * Returns the _status, indicating any error the Reporter has encountered. */ @@ -78,7 +84,7 @@ namespace repl { * Returns the previous return status so that the owner can decide whether the Reporter * needs a new target to whom it can report. */ - Status previousReturnStatus() const; + Status getStatus() const; private: /** diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index ed617fa0cdf..c4bd0b6176e 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -149,7 +149,7 @@ namespace { ASSERT_TRUE(reporter->isActive()); scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah"); getNet()->runReadyNetworkOperations(); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->previousReturnStatus()); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->getStatus()); ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->trigger()); ASSERT_FALSE(reporter->isActive()); ASSERT_FALSE(getNet()->hasReadyRequests()); @@ -167,7 +167,7 @@ namespace { scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah"); getNet()->runReadyNetworkOperations(); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->previousReturnStatus()); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->getStatus()); ASSERT_FALSE(reporter->willRunAgain()); ASSERT_FALSE(reporter->isActive()); ASSERT_FALSE(getNet()->hasReadyRequests()); @@ -238,7 +238,7 @@ namespace { ASSERT_FALSE(reporter->isActive()); ASSERT_FALSE(reporter->willRunAgain()); ASSERT_FALSE(getNet()->hasReadyRequests()); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->previousReturnStatus()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->getStatus()); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger()); } @@ -262,7 +262,7 @@ namespace { ASSERT_FALSE(reporter->isActive()); ASSERT_FALSE(reporter->willRunAgain()); ASSERT_FALSE(getNet()->hasReadyRequests()); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->previousReturnStatus()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->getStatus()); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger()); } diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 04e82151370..fc1a10de72d 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -444,6 +444,10 @@ namespace { str::stream() << "initial sync failed: " << msg); } + // WARNING: If the 3rd oplog sync step is removed we must reset minValid + // to the last entry on the source server so that we don't come + // out of recovering until we get there (since the previous steps + // could have fetched newer document than the oplog entry we were applying from). msg = "oplog sync 3 of 3"; log() << msg; |