diff options
author | Benety Goh <benety@mongodb.com> | 2016-03-30 12:04:23 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-04-08 10:27:51 -0400 |
commit | 11132f69c93a279ebe7f0ccb71929d4cd2b8675d (patch) | |
tree | 21eebd96010805a9cacffb0e02b7ac8befba90a6 /src | |
parent | 12d251318b76936c9655f317fd29ce46cb5e862b (diff) | |
download | mongo-11132f69c93a279ebe7f0ccb71929d4cd2b8675d.tar.gz |
SERVER-22774 Copied BackgroundSync::_fetcherCallback logic to OplogFetcher
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/client/fetcher.cpp | 16 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h | 20 | ||||
-rw-r--r-- | src/mongo/client/fetcher_test.cpp | 91 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 235 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state.h | 82 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_impl.cpp | 78 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_impl.h | 64 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_mock.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_mock.h | 70 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 384 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 221 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 875 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/unittest/unittest.h | 13 | ||||
-rw-r--r-- | src/mongo/util/queue.h | 43 |
18 files changed, 2182 insertions, 209 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index ef3c3d1fcb1..843cf02a550 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -165,6 +165,22 @@ Fetcher::~Fetcher() { DESTRUCTOR_GUARD(cancel(); wait();); } +HostAndPort Fetcher::getSource() const { + return _source; +} + +BSONObj Fetcher::getCommandObject() const { + return _cmdObj; +} + +BSONObj Fetcher::getMetadataObject() const { + return _metadata; +} + +Milliseconds Fetcher::getTimeout() const { + return _timeout; +} + std::string Fetcher::getDiagnosticString() const { stdx::lock_guard<stdx::mutex> lk(_mutex); str::stream output; diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index 4fe993e0336..2050c3687d5 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -128,6 +128,26 @@ public: virtual ~Fetcher(); /** + * Returns host where remote commands will be sent to. + */ + HostAndPort getSource() const; + + /** + * Returns command object sent in first remote command. + */ + BSONObj getCommandObject() const; + + /** + * Returns metadata object sent in remote commands. + */ + BSONObj getMetadataObject() const; + + /** + * Returns timeout for remote commands to complete. + */ + Milliseconds getTimeout() const; + + /** * Returns diagnostic information. */ std::string getDiagnosticString() const; diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp index e6b4519ab12..42fa4324a7f 100644 --- a/src/mongo/client/fetcher_test.cpp +++ b/src/mongo/client/fetcher_test.cpp @@ -34,6 +34,7 @@ #include "mongo/db/jsobj.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/executor/network_interface_mock.h" +#include "mongo/rpc/metadata.h" #include "mongo/unittest/unittest.h" @@ -43,7 +44,7 @@ using namespace mongo; using executor::NetworkInterfaceMock; using executor::TaskExecutor; -const HostAndPort target("localhost", -1); +const HostAndPort source("localhost", -1); const BSONObj findCmdObj = BSON("find" << "coll"); @@ -92,15 +93,15 @@ FetcherTest::FetcherTest() void FetcherTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); clear(); - fetcher.reset(new Fetcher(&getExecutor(), - target, - "db", - findCmdObj, - stdx::bind(&FetcherTest::_callback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3))); + fetcher = stdx::make_unique<Fetcher>(&getExecutor(), + source, + "db", + findCmdObj, + stdx::bind(&FetcherTest::_callback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3)); launchExecutorThread(); } @@ -206,28 +207,42 @@ void FetcherTest::_callback(const StatusWith<Fetcher::QueryResponse>& result, } } -void unusedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { +void unreachableCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { FAIL("should not reach here"); } +void doNothingCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) {} + TEST_F(FetcherTest, InvalidConstruction) { TaskExecutor& executor = getExecutor(); // Null executor. - ASSERT_THROWS(Fetcher(nullptr, target, "db", findCmdObj, unusedFetcherCallback), UserException); + ASSERT_THROWS_CODE_AND_WHAT(Fetcher(nullptr, source, "db", findCmdObj, unreachableCallback), + UserException, + ErrorCodes::BadValue, + "null task executor"); // Empty database name. - ASSERT_THROWS(Fetcher(&executor, target, "", findCmdObj, unusedFetcherCallback), UserException); + ASSERT_THROWS_CODE_AND_WHAT(Fetcher(&executor, source, "", findCmdObj, unreachableCallback), + UserException, + ErrorCodes::BadValue, + "database name cannot be empty"); // Empty command object. - ASSERT_THROWS(Fetcher(&executor, target, "db", BSONObj(), unusedFetcherCallback), - UserException); + ASSERT_THROWS_CODE_AND_WHAT(Fetcher(&executor, source, "db", BSONObj(), unreachableCallback), + UserException, + ErrorCodes::BadValue, + "command object cannot be empty"); // Callback function cannot be null. - ASSERT_THROWS(Fetcher(&executor, target, "db", findCmdObj, Fetcher::CallbackFn()), - UserException); + ASSERT_THROWS_CODE_AND_WHAT(Fetcher(&executor, source, "db", findCmdObj, Fetcher::CallbackFn()), + UserException, + ErrorCodes::BadValue, + "callback function cannot be null"); } // Command object can refer to any command that returns a cursor. This @@ -236,18 +251,44 @@ TEST_F(FetcherTest, NonFindCommand) { TaskExecutor& executor = getExecutor(); Fetcher(&executor, - target, + source, "db", BSON("listIndexes" << "coll"), - unusedFetcherCallback); - Fetcher(&executor, target, "db", BSON("listCollections" << 1), unusedFetcherCallback); - Fetcher(&executor, target, "db", BSON("a" << 1), unusedFetcherCallback); + unreachableCallback); + Fetcher(&executor, source, "db", BSON("listCollections" << 1), unreachableCallback); + Fetcher(&executor, source, "db", BSON("a" << 1), unreachableCallback); +} + +TEST_F(FetcherTest, RemoteCommandRequestShouldContainCommandParametersPassedToConstructor) { + auto metadataObj = BSON("x" << 1); + Milliseconds timeout(8000); + + fetcher = stdx::make_unique<Fetcher>( + &getExecutor(), source, "db", findCmdObj, doNothingCallback, metadataObj, timeout); + + ASSERT_EQUALS(source, fetcher->getSource()); + ASSERT_EQUALS(findCmdObj, fetcher->getCommandObject()); + ASSERT_EQUALS(metadataObj, fetcher->getMetadataObject()); + ASSERT_EQUALS(timeout, fetcher->getTimeout()); + + ASSERT_OK(fetcher->schedule()); + + auto net = getNet(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + net->exitNetwork(); + + ASSERT_EQUALS(source, request.target); + ASSERT_EQUALS(findCmdObj, request.cmdObj); + ASSERT_EQUALS(metadataObj, request.metadata); + ASSERT_EQUALS(timeout, request.timeout); } TEST_F(FetcherTest, GetDiagnosticString) { - Fetcher fetcher(&getExecutor(), target, "db", findCmdObj, unusedFetcherCallback); - ASSERT_FALSE(fetcher.getDiagnosticString().empty()); + ASSERT_FALSE(fetcher->getDiagnosticString().empty()); } TEST_F(FetcherTest, IsActiveAfterSchedule) { diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 11a30e6852d..27fd72ad763 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -324,6 +324,7 @@ env.Library('repl_coordinator_impl', '$BUILD_DIR/mongo/rpc/metadata', '$BUILD_DIR/mongo/util/fail_point', 'data_replicator', + 'data_replicator_external_state_impl', 'repl_coordinator_global', 'repl_coordinator_interface', 'repl_settings', @@ -558,6 +559,31 @@ env.Library( ) env.Library( + target='oplog_fetcher', + source=[ + 'oplog_fetcher.cpp', + ], + LIBDEPS=[ + 'repl_coordinator_interface', + 'replica_set_messages', + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/fetcher', + '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/executor/task_executor_interface', + ], +) + +env.CppUnitTest( + target='oplog_fetcher_test', + source='oplog_fetcher_test.cpp', + LIBDEPS=[ + 'oplog_fetcher', + 'data_replicator_external_state_mock', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + ], +) + +env.Library( target='reporter', source=[ 'reporter.cpp', @@ -763,6 +789,28 @@ env.CppUnitTest( ) env.Library( + target='data_replicator_external_state_impl', + source=[ + 'data_replicator_external_state_impl.cpp', + ], + LIBDEPS=[ + 'optime', + 'repl_coordinator_interface', + '$BUILD_DIR/mongo/base', + ], +) + +env.Library( + target='data_replicator_external_state_mock', + source=[ + 'data_replicator_external_state_mock.cpp', + ], + LIBDEPS=[ + 'optime', + ], +) + +env.Library( target='data_replicator', source=[ 'data_replicator.cpp', @@ -772,6 +820,7 @@ env.Library( 'collection_cloner', 'database_cloner', 'multiapplier', + 'oplog_fetcher', 'optime', 'reporter', '$BUILD_DIR/mongo/client/fetcher', @@ -786,6 +835,7 @@ env.CppUnitTest( LIBDEPS=[ 'base_cloner_test_fixture', 'data_replicator', + 'data_replicator_external_state_mock', 'replication_executor_test_fixture', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/unittest/concurrency', diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 660d53e1567..734d8362235 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -42,11 +42,13 @@ #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" #include "mongo/db/repl/member_state.h" +#include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/sync_source_selector.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" @@ -94,102 +96,6 @@ std::string toString(DataReplicatorState s) { MONGO_UNREACHABLE; } -/** - * Follows the fetcher pattern for a find+getmore on an oplog - * Returns additional errors if the start oplog entry cannot be found. - */ -class OplogFetcher : public QueryFetcher { - MONGO_DISALLOW_COPYING(OplogFetcher); - -public: - OplogFetcher(ReplicationExecutor* exec, - const Timestamp& startTS, - const HostAndPort& src, - const NamespaceString& nss, - const QueryFetcher::CallbackFn& work); - - virtual ~OplogFetcher() = default; - std::string toString() const; - - const Timestamp getStartTimestamp() const { - return _startTS; - } - -protected: - void _delegateCallback(const Fetcher::QueryResponseStatus& fetchResult, NextAction* nextAction); - - const Timestamp _startTS; -}; - -// OplogFetcher -OplogFetcher::OplogFetcher(ReplicationExecutor* exec, - const Timestamp& startTS, - const HostAndPort& src, - const NamespaceString& oplogNSS, - const QueryFetcher::CallbackFn& work) - // TODO: add query options await_data, oplog_replay - : QueryFetcher(exec, - src, - oplogNSS, - BSON("find" << oplogNSS.coll() << "filter" - << BSON("ts" << BSON("$gte" << startTS))), - work, - BSON(rpc::kReplSetMetadataFieldName << 1)), - _startTS(startTS) {} - -std::string OplogFetcher::toString() const { - return str::stream() << "OplogReader -" - << " startTS: " << _startTS.toString() - << " fetcher: " << QueryFetcher::getDiagnosticString(); -} - -void OplogFetcher::_delegateCallback(const Fetcher::QueryResponseStatus& fetchResult, - Fetcher::NextAction* nextAction) { - if (fetchResult.isOK()) { - Fetcher::Documents::const_iterator firstDoc = fetchResult.getValue().documents.begin(); - auto hasDoc = firstDoc != fetchResult.getValue().documents.end(); - - if (fetchResult.getValue().first) { - if (!hasDoc) { - // Set next action to none. - *nextAction = Fetcher::NextAction::kNoAction; - _onQueryResponse( - Status(ErrorCodes::OplogStartMissing, - str::stream() - << "No operations on sync source with op time starting at: " - << _startTS.toString()), - nextAction); - return; - } else if ((*firstDoc)["ts"].eoo()) { - // Set next action to none. - *nextAction = Fetcher::NextAction::kNoAction; - _onQueryResponse(Status(ErrorCodes::OplogStartMissing, - str::stream() << "Missing 'ts' field in first returned " - << (*firstDoc)["ts"] << " starting at " - << _startTS.toString()), - nextAction); - return; - } else if ((*firstDoc)["ts"].timestamp() != _startTS) { - // Set next action to none. - *nextAction = Fetcher::NextAction::kNoAction; - _onQueryResponse(Status(ErrorCodes::OplogStartMissing, - str::stream() << "First returned " << (*firstDoc)["ts"] - << " is not where we wanted to start: " - << _startTS.toString()), - nextAction); - return; - } - } - - if (hasDoc) { - _onQueryResponse(fetchResult, nextAction); - } else { - } - } else { - _onQueryResponse(fetchResult, nextAction); - } -}; - class DatabasesCloner { public: DatabasesCloner(ReplicationExecutor* exec, @@ -511,8 +417,12 @@ void DatabasesCloner::_failed() { } // Data Replicator -DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec) +DataReplicator::DataReplicator( + DataReplicatorOptions opts, + std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState, + ReplicationExecutor* exec) : _opts(opts), + _dataReplicatorExternalState(std::move(dataReplicatorExternalState)), _exec(exec), _state(DataReplicatorState::Uninitialized), _fetcherPaused(false), @@ -533,7 +443,7 @@ DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* } DataReplicator::~DataReplicator() { - DESTRUCTOR_GUARD(_cancelAllHandles_inlock(); _waitOnAll_inlock();); + DESTRUCTOR_GUARD(_cancelAllHandles_inlock(); _oplogBuffer.clear(); _waitOnAll_inlock();); } Status DataReplicator::start() { @@ -767,14 +677,27 @@ TimestampStatus DataReplicator::initialSync() { attemptErrorStatus = tsStatus.getStatus(); if (attemptErrorStatus.isOK()) { _initialSyncState->beginTimestamp = tsStatus.getValue(); - _fetcher.reset(new OplogFetcher(_exec, - _initialSyncState->beginTimestamp, - _syncSource, - _opts.remoteOplogNS, - stdx::bind(&DataReplicator::_onOplogFetchFinish, - this, - stdx::placeholders::_1, - stdx::placeholders::_2))); + long long term = OpTime::kUninitializedTerm; + // TODO: Read last fetched hash from storage. + long long lastHashFetched = 1LL; + OpTime lastOpTimeFetched(_initialSyncState->beginTimestamp, term); + _fetcher = stdx::make_unique<OplogFetcher>( + _exec, + OpTimeWithHash(lastHashFetched, lastOpTimeFetched), + _syncSource, + _opts.remoteOplogNS, + _opts.getReplSetConfig(), + _dataReplicatorExternalState.get(), + stdx::bind(&DataReplicator::_enqueueDocuments, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3, + stdx::placeholders::_4), + stdx::bind(&DataReplicator::_onOplogFetchFinish, + this, + stdx::placeholders::_1, + stdx::placeholders::_2)); _scheduleFetch_inlock(); lk.unlock(); _initialSyncState->dbsCloner.start(); // When the cloner is done applier starts. @@ -888,7 +811,7 @@ bool DataReplicator::_anyActiveHandles_inlock() const { void DataReplicator::_cancelAllHandles_inlock() { if (_fetcher) - _fetcher->cancel(); + _fetcher->shutdown(); if (_applier) _applier->cancel(); if (_reporter) @@ -899,7 +822,7 @@ void DataReplicator::_cancelAllHandles_inlock() { void DataReplicator::_waitOnAll_inlock() { if (_fetcher) - _fetcher->wait(); + _fetcher->join(); if (_applier) _applier->wait(); if (_reporter) @@ -1102,6 +1025,10 @@ void DataReplicator::_onApplyBatchFinish(const CallbackArgs& cbData, const TimestampStatus& ts, const Operations& ops, const size_t numApplied) { + if (ErrorCodes::CallbackCanceled == cbData.status) { + return; + } + invariant(cbData.status.isOK()); UniqueLock lk(_mutex); if (_initialSyncState) { @@ -1293,21 +1220,31 @@ Status DataReplicator::_scheduleFetch_inlock() { } } - const auto startOptime = _opts.getMyLastOptime().getTimestamp(); + const auto startOptime = _opts.getMyLastOptime(); + // TODO: Read last applied hash from storage. See + // BackgroundSync::_readLastAppliedHash(OperationContex*). + long long startHash = 0LL; const auto remoteOplogNS = _opts.remoteOplogNS; - // TODO: add query options await_data, oplog_replay - _fetcher.reset(new OplogFetcher(_exec, - startOptime, - _syncSource, - remoteOplogNS, - stdx::bind(&DataReplicator::_onOplogFetchFinish, - this, - stdx::placeholders::_1, - stdx::placeholders::_2))); + _fetcher = stdx::make_unique<OplogFetcher>(_exec, + OpTimeWithHash(startHash, startOptime), + _syncSource, + remoteOplogNS, + _opts.getReplSetConfig(), + _dataReplicatorExternalState.get(), + stdx::bind(&DataReplicator::_enqueueDocuments, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3, + stdx::placeholders::_4), + stdx::bind(&DataReplicator::_onOplogFetchFinish, + this, + stdx::placeholders::_1, + stdx::placeholders::_2)); } if (!_fetcher->isActive()) { - Status status = _fetcher->schedule(); + Status status = _fetcher->startup(); if (!status.isOK()) { return status; } @@ -1335,6 +1272,7 @@ Status DataReplicator::scheduleShutdown() { invariant(!_onShutdown.isValid()); _onShutdown = eventStatus.getValue(); _cancelAllHandles_inlock(); + _oplogBuffer.clear(); } // Schedule _doNextActions in case nothing is active to trigger the _onShutdown event. @@ -1369,38 +1307,45 @@ Status DataReplicator::_shutdown() { return status; } -void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse>& fetchResult, - Fetcher::NextAction* nextAction) { - const Status status = fetchResult.getStatus(); - if (status.code() == ErrorCodes::CallbackCanceled) +void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info, + Milliseconds getMoreElapsed) { + if (info.toApplyDocumentCount == 0) { return; - if (status.isOK()) { - const auto& docs = fetchResult.getValue().documents; - if (docs.begin() != docs.end()) { - LockGuard lk(_mutex); - std::for_each( - docs.cbegin(), docs.cend(), [&](const BSONObj& doc) { _oplogBuffer.push(doc); }); - auto doc = docs.rbegin(); - BSONElement tsElem(doc->getField("ts")); - while (tsElem.eoo() && doc != docs.rend()) { - tsElem = (doc++)->getField("ts"); - } + } - if (!tsElem.eoo()) { - _lastTimestampFetched = tsElem.timestamp(); - } else { - warning() << "Did not find a 'ts' timestamp field in any of the fetched documents"; - } - } - if (*nextAction == Fetcher::NextAction::kNoAction) { - // TODO: create new fetcher?, with new query from where we left off -- d'tor fetcher - } + // Wait for enough space. + // Gets unblocked on shutdown. + _oplogBuffer.waitForSpace(info.toApplyDocumentBytes); + + OCCASIONALLY { + LOG(2) << "bgsync buffer has " << _oplogBuffer.size() << " bytes"; } - if (!status.isOK()) { + // Buffer docs for later application. + _oplogBuffer.pushAllNonBlocking(begin, end); + + _lastTimestampFetched = info.lastDocument.opTime.getTimestamp(); + + // TODO: updates metrics with "info" and "getMoreElapsed". + + _doNextActions(); +} + +void DataReplicator::_onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched) { + if (status.code() == ErrorCodes::CallbackCanceled) { + return; + } else if (status.isOK()) { + _lastTimestampFetched = lastFetched.opTime.getTimestamp(); + + // TODO: create new fetcher?, with new query from where we left off -- d'tor fetcher + } else { + invariant(!status.isOK()); // Got an error, now decide what to do... switch (status.code()) { - case ErrorCodes::OplogStartMissing: { + case ErrorCodes::OplogStartMissing: + case ErrorCodes::RemoteOplogStale: { _setState(DataReplicatorState::Rollback); // possible rollback auto scheduleResult = _exec->scheduleDBWork( diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index 53dda5f96c5..6382cf9f9a4 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -40,6 +40,8 @@ #include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" +#include "mongo/db/repl/data_replicator_external_state.h" +#include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/reporter.h" @@ -70,7 +72,6 @@ using Response = executor::RemoteCommandResponse; using TimestampStatus = StatusWith<Timestamp>; using UniqueLock = stdx::unique_lock<stdx::mutex>; -class OplogFetcher; struct InitialSyncState; struct MemberState; class ReplicationProgressManager; @@ -113,6 +114,9 @@ struct DataReplicatorOptions { /** Function to get this node's slaveDelay. */ using GetSlaveDelayFn = stdx::function<Seconds()>; + /** Function to get current replica set configuration */ + using GetReplSetConfigFn = stdx::function<ReplicaSetConfig()>; + // Error and retry values Milliseconds syncSourceRetryWait{1000}; Milliseconds initialSyncRetryWait{1000}; @@ -140,6 +144,8 @@ struct DataReplicatorOptions { SetMyLastOptimeFn setMyLastOptime; SetFollowerModeFn setFollowerMode; GetSlaveDelayFn getSlaveDelay; + GetReplSetConfigFn getReplSetConfig; + SyncSourceSelector* syncSourceSelector = nullptr; std::string toString() const { @@ -158,7 +164,9 @@ struct DataReplicatorOptions { */ class DataReplicator { public: - DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec); + DataReplicator(DataReplicatorOptions opts, + std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState, + ReplicationExecutor* exec); virtual ~DataReplicator(); @@ -226,8 +234,16 @@ private: // Only executed via executor void _resumeFinish(CallbackArgs cbData); - void _onOplogFetchFinish(const QueryResponseStatus& fetchResult, - Fetcher::NextAction* nextAction); + + /** + * Pushes documents from oplog fetcher to blocking queue for + * applier to consume. + */ + void _enqueueDocuments(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info, + Milliseconds elapsed); + void _onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched); void _rollbackOperations(const CallbackArgs& cbData); void _doNextActions(); void _doNextActions_InitialSync_inlock(); @@ -273,6 +289,7 @@ private: // Set during construction const DataReplicatorOptions _opts; + std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; ReplicationExecutor* _exec; // diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h new file mode 100644 index 00000000000..983290a2148 --- /dev/null +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/optime_with.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace repl { + +/** + * Holds current term and last committed optime necessary to populate find/getMore command requests. + */ +using OpTimeWithTerm = OpTimeWith<long long>; + +/** + * This class represents the interface the DataReplicator uses to interact with the + * rest of the system. All functionality of the DataReplicator that would introduce + * dependencies on large sections of the server code and thus break the unit testability of + * DataReplicator should be moved here. + */ +class DataReplicatorExternalState { + MONGO_DISALLOW_COPYING(DataReplicatorExternalState); + +public: + DataReplicatorExternalState() = default; + + virtual ~DataReplicatorExternalState() = default; + + /** + * Returns the current term and last committed optime. + * Returns (OpTime::kUninitializedTerm, OpTime()) if not available. + */ + virtual OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() = 0; + + /** + * Forwards the parsed metadata in the query results to the replication system. + */ + virtual void processMetadata(const rpc::ReplSetMetadata& metadata) = 0; + + /** + * Evaluates quality of sync source. Accepts the current sync source; the last optime on this + * sync source (from metadata); and whether this sync source has a sync source (also from + * metadata). + */ + virtual bool shouldStopFetching(const HostAndPort& source, + const OpTime& sourceOpTime, + bool sourceHasSyncSource) = 0; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp new file mode 100644 index 00000000000..828a2aa51a5 --- /dev/null +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/data_replicator_external_state_impl.h" + +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { + +DataReplicatorExternalStateImpl::DataReplicatorExternalStateImpl( + ReplicationCoordinator* replicationCoordinator) + : _replicationCoordinator(replicationCoordinator) {} + +OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOpTime() { + if (!_replicationCoordinator->isV1ElectionProtocol()) { + return {OpTime::kUninitializedTerm, OpTime()}; + } + return {_replicationCoordinator->getTerm(), _replicationCoordinator->getLastCommittedOpTime()}; +} + +void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& metadata) { + _replicationCoordinator->processReplSetMetadata(metadata); + if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { + _replicationCoordinator->cancelAndRescheduleElectionTimeout(); + } +} + +bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source, + const OpTime& sourceOpTime, + bool sourceHasSyncSource) { + // Re-evaluate quality of sync target. + if (_replicationCoordinator->shouldChangeSyncSource( + source, sourceOpTime, sourceHasSyncSource)) { + LOG(1) << "Canceling oplog query because we have to choose a sync source. Current source: " + << source << ", OpTime " << sourceOpTime + << ", hasSyncSource:" << sourceHasSyncSource; + return true; + } + return false; +} + +ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const { + return _replicationCoordinator; +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h new file mode 100644 index 00000000000..8fc84ff218c --- /dev/null +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/data_replicator_external_state.h" + +namespace mongo { +namespace repl { + +class ReplicationCoordinator; + +/** + * Data replicator external state implementation using a replication coordinator. + */ + +class DataReplicatorExternalStateImpl : public DataReplicatorExternalState { +public: + DataReplicatorExternalStateImpl(ReplicationCoordinator* replicationCoordinator); + + OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; + + void processMetadata(const rpc::ReplSetMetadata& metadata) override; + + bool shouldStopFetching(const HostAndPort& source, + const OpTime& sourceOpTime, + bool sourceHasSyncSource) override; + +protected: + ReplicationCoordinator* getReplicationCoordinator() const; + +private: + // Not owned by us. + ReplicationCoordinator* _replicationCoordinator; +}; + + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp new file mode 100644 index 00000000000..501d9ae70c3 --- /dev/null +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/data_replicator_external_state_mock.h" + +namespace mongo { +namespace repl { + +OpTimeWithTerm DataReplicatorExternalStateMock::getCurrentTermAndLastCommittedOpTime() { + return {currentTerm, lastCommittedOpTime}; +} + +void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata& metadata) { + metadataProcessed = metadata; +} + +bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source, + const OpTime& sourceOpTime, + bool sourceHasSyncSource) { + lastSyncSourceChecked = source; + syncSourceLastOpTime = sourceOpTime; + syncSourceHasSyncSource = sourceHasSyncSource; + return shouldStopFetchingResult; +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h new file mode 100644 index 00000000000..ef78c691157 --- /dev/null +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/data_replicator_external_state.h" + +namespace mongo { +namespace repl { + +class ReplicationCoordinator; + +/** + * Data replicator external state implementation for testing. + */ + +class DataReplicatorExternalStateMock : public DataReplicatorExternalState { +public: + OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; + + void processMetadata(const rpc::ReplSetMetadata& metadata) override; + + bool shouldStopFetching(const HostAndPort& source, + const OpTime& sourceOpTime, + bool sourceHasSyncSource) override; + + // Returned by getCurrentTermAndLastCommittedOpTime. + long long currentTerm = OpTime::kUninitializedTerm; + OpTime lastCommittedOpTime; + + // Set by processMetadata. + rpc::ReplSetMetadata metadataProcessed; + + // Set by shouldStopFetching. + HostAndPort lastSyncSourceChecked; + OpTime syncSourceLastOpTime; + bool syncSourceHasSyncSource = false; + + // Returned by shouldStopFetching. + bool shouldStopFetchingResult = false; +}; + + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 0373b14dfea..6aa10156dba 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -36,6 +36,7 @@ #include "mongo/db/json.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/data_replicator.h" +#include "mongo/db/repl/data_replicator_external_state_mock.h" #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/update_position_args.h" @@ -183,6 +184,9 @@ protected: reset(); launchExecutorThread(); + + _myLastOpTime = OpTime({3, 0}, 1); + DataReplicatorOptions options; options.initialSyncRetryWait = Milliseconds(0); options.applierFn = [this](const MultiApplier::Operations& ops) { return _applierFn(ops); }; @@ -207,8 +211,25 @@ protected: }; options.getSlaveDelay = [this]() { return Seconds(0); }; options.syncSourceSelector = this; + options.getReplSetConfig = []() { + ReplicaSetConfig config; + ASSERT_OK( + config.initialize(BSON("_id" + << "myset" + << "version" << 1 << "protocolVersion" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "localhost:12345")) << "settings" + << BSON("electionTimeoutMillis" << 10000)))); + return config; + }; + + auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>(); + dataReplicatorExternalState->currentTerm = 1LL; + dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime; + try { - _dr.reset(new DataReplicator(options, &(getReplExecutor()))); + _dr.reset(new DataReplicator( + options, std::move(dataReplicatorExternalState), &(getReplExecutor()))); } catch (...) { ASSERT_OK(exceptionToStatus()); } @@ -325,7 +346,7 @@ protected: const int expectedResponses(_responses.size()); // counter for oplog entries - int c(0); + int c(1); while (true) { net->enterNetwork(); if (!net->hasReadyRequests() && processedRequests < expectedResponses) { @@ -871,10 +892,16 @@ TEST_F(SteadyStateTest, RollbackTwoSyncSourcesSecondRollbackSucceeds) { } TEST_F(SteadyStateTest, PauseDataReplicator) { + auto lastOperationApplied = BSON("op" + << "a" + << "v" << OplogEntry::kOplogVersion << "ts" + << Timestamp(Seconds(123), 0)); + auto operationToApply = BSON("op" << "a" << "v" << OplogEntry::kOplogVersion << "ts" - << Timestamp(Seconds(123), 0)); + << Timestamp(Seconds(456), 0)); + stdx::mutex mutex; unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; @@ -896,7 +923,7 @@ TEST_F(SteadyStateTest, PauseDataReplicator) { }; auto& dr = getDR(); - _myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kInitialTerm); + _myLastOpTime = OpTime(lastOperationApplied["ts"].timestamp(), OpTime::kInitialTerm); _memberState = MemberState::RS_SECONDARY; auto net = getNet(); @@ -907,10 +934,12 @@ TEST_F(SteadyStateTest, PauseDataReplicator) { ASSERT_TRUE(net->hasReadyRequests()); { auto networkRequest = net->getNextReadyRequest(); - auto commandResponse = BSON( - "ok" << 1 << "cursor" << BSON("id" << 0LL << "ns" - << "local.oplog.rs" - << "firstBatch" << BSON_ARRAY(operationToApply))); + auto commandResponse = + BSON("ok" << 1 << "cursor" + << BSON("id" << 1LL << "ns" + << "local.oplog.rs" + << "firstBatch" + << BSON_ARRAY(lastOperationApplied << operationToApply))); scheduleNetworkResponse(networkRequest, commandResponse); } @@ -955,10 +984,16 @@ TEST_F(SteadyStateTest, PauseDataReplicator) { } TEST_F(SteadyStateTest, ApplyOneOperation) { + auto lastOperationApplied = BSON("op" + << "a" + << "v" << OplogEntry::kOplogVersion << "ts" + << Timestamp(Seconds(123), 0)); + auto operationToApply = BSON("op" << "a" << "v" << OplogEntry::kOplogVersion << "ts" - << Timestamp(Seconds(123), 0)); + << Timestamp(Seconds(456), 0)); + stdx::mutex mutex; unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; @@ -979,7 +1014,7 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { barrier.countDownAndWait(); }; - _myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kInitialTerm); + _myLastOpTime = OpTime(lastOperationApplied["ts"].timestamp(), OpTime::kInitialTerm); _memberState = MemberState::RS_SECONDARY; auto net = getNet(); @@ -991,10 +1026,12 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { ASSERT_TRUE(net->hasReadyRequests()); { auto networkRequest = net->getNextReadyRequest(); - auto commandResponse = BSON( - "ok" << 1 << "cursor" << BSON("id" << 0LL << "ns" - << "local.oplog.rs" - << "firstBatch" << BSON_ARRAY(operationToApply))); + auto commandResponse = + BSON("ok" << 1 << "cursor" + << BSON("id" << 1LL << "ns" + << "local.oplog.rs" + << "firstBatch" + << BSON_ARRAY(lastOperationApplied << operationToApply))); scheduleNetworkResponse(networkRequest, commandResponse); } ASSERT_EQUALS(0U, dr.getOplogBufferCount()); diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp new file mode 100644 index 00000000000..fcb41bdca51 --- /dev/null +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -0,0 +1,384 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/oplog_fetcher.h" + +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace repl { + +Seconds OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout(2); + +namespace { + +/** + * Calculates await data timeout based on the current replica set configuration. + */ +Milliseconds calculateAwaitDataTimeout(const ReplicaSetConfig& config) { + // Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election + // timeout. This enables the sync source to communicate liveness of the primary to secondaries. + // Under protocol version 0, use a default timeout of 2 seconds for awaitData. + return config.getProtocolVersion() == 1LL ? config.getElectionTimeoutPeriod() / 2 + : OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout; +} + +/** + * Returns find command object suitable for tailing remote oplog. + */ +BSONObj makeFindCommandObject(DataReplicatorExternalState* dataReplicatorExternalState, + const NamespaceString& nss, + OpTime lastOpTimeFetched) { + invariant(dataReplicatorExternalState); + BSONObjBuilder cmdBob; + cmdBob.append("find", nss.coll()); + cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))); + cmdBob.append("tailable", true); + cmdBob.append("oplogReplay", true); + cmdBob.append("awaitData", true); + cmdBob.append("maxTimeMS", durationCount<Milliseconds>(Minutes(1))); // 1 min initial find. + auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); + if (opTimeWithTerm.value != OpTime::kUninitializedTerm) { + cmdBob.append("term", opTimeWithTerm.value); + } + return cmdBob.obj(); +} + +/** + * Returns getMore command object suitable for tailing remote oplog. + */ +BSONObj makeGetMoreCommandObject(DataReplicatorExternalState* dataReplicatorExternalState, + const NamespaceString& nss, + CursorId cursorId, + Milliseconds fetcherMaxTimeMS) { + BSONObjBuilder cmdBob; + cmdBob.append("getMore", cursorId); + cmdBob.append("collection", nss.coll()); + cmdBob.append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS)); + auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); + if (opTimeWithTerm.value != OpTime::kUninitializedTerm) { + cmdBob.append("term", opTimeWithTerm.value); + opTimeWithTerm.opTime.append(&cmdBob, "lastKnownCommittedOpTime"); + } + return cmdBob.obj(); +} + +/** + * Returns command metadata object suitable for tailing remote oplog. + */ +StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) { + return isV1ElectionProtocol ? BSON(rpc::kReplSetMetadataFieldName << 1) + : rpc::makeEmptyMetadata(); +} + +/** + * Checks the first batch of results from query. + * 'documents' are the first batch of results returned from tailing the remote oplog. + * 'lastFetched' optime and hash should be consistent with the predicate in the query. + * Returns RemoteOplogStale if the oplog query has no results. + * Returns OplogStartMissing if we cannot find the optime of the last fetched operation in + * the remote oplog. + */ +Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched) { + if (documents.empty()) { + // The GTE query from upstream returns nothing, so we're ahead of the upstream. + return Status(ErrorCodes::RemoteOplogStale, + str::stream() << "We are ahead of the sync source. Our last op time fetched: " + << lastFetched.opTime.toString()); + } + const auto& o = documents.front(); + auto opTimeResult = OpTime::parseFromOplogEntry(o); + if (!opTimeResult.isOK()) { + return Status(ErrorCodes::OplogStartMissing, + str::stream() << "our last op time fetched: " << lastFetched.opTime.toString() + << " (hash: " << lastFetched.value << ")" + << ". failed to parse optime from first oplog on source: " + << o.toString() << ": " << opTimeResult.getStatus().toString()); + } + auto opTime = opTimeResult.getValue(); + long long hash = o["h"].numberLong(); + if (opTime != lastFetched.opTime || hash != lastFetched.value) { + return Status(ErrorCodes::OplogStartMissing, + str::stream() << "our last op time fetched: " << lastFetched.opTime.toString() + << ". source's GTE: " << opTime.toString() << " hashes: (" + << lastFetched.value << "/" << hash << ")"); + } + return Status::OK(); +} + +} // namespace + +StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( + const Fetcher::Documents& documents, bool first, Timestamp lastTS) { + if (first && documents.empty()) { + return Status(ErrorCodes::OplogStartMissing, + str::stream() << "The first batch of oplog entries is empty, but expected at " + "least 1 document matching ts: " << lastTS.toString()); + } + + DocumentsInfo info; + // The count of the bytes of the documents read off the network. + info.networkDocumentBytes = 0; + info.networkDocumentCount = 0; + for (auto&& doc : documents) { + info.networkDocumentBytes += doc.objsize(); + ++info.networkDocumentCount; + + // If this is the first response (to the $gte query) then we already applied the first doc. + if (first && info.networkDocumentCount == 1U) { + continue; + } + + // Check to see if the oplog entry goes back in time for this document. + const auto docOpTime = OpTime::parseFromOplogEntry(doc); + // entries must have a "ts" field. + if (!docOpTime.isOK()) { + return docOpTime.getStatus(); + } + + info.lastDocument = {doc["h"].numberLong(), docOpTime.getValue()}; + + const auto docTS = info.lastDocument.opTime.getTimestamp(); + if (lastTS >= docTS) { + return Status(ErrorCodes::OplogOutOfOrder, + str::stream() << "Out of order entries in oplog. lastTS: " + << lastTS.toString() << " outOfOrderTS:" << docTS.toString() + << " at count:" << info.networkDocumentCount); + } + lastTS = docTS; + } + + // These numbers are for the documents we will apply. + info.toApplyDocumentCount = documents.size(); + info.toApplyDocumentBytes = info.networkDocumentBytes; + if (first) { + // The count is one less since the first document found was already applied ($gte $ts query) + // and we will not apply it again. + --info.toApplyDocumentCount; + auto alreadyAppliedDocument = documents.cbegin(); + info.toApplyDocumentBytes -= alreadyAppliedDocument->objsize(); + } + return info; +} + +OplogFetcher::OplogFetcher(executor::TaskExecutor* exec, + OpTimeWithHash lastFetched, + HostAndPort source, + NamespaceString oplogNSS, + ReplicaSetConfig config, + DataReplicatorExternalState* dataReplicatorExternalState, + EnqueueDocumentsFn enqueueDocumentsFn, + OnShutdownCallbackFn onShutdownCallbackFn) + : _dataReplicatorExternalState(dataReplicatorExternalState), + _fetcher(exec, + source, + oplogNSS.db().toString(), + makeFindCommandObject(dataReplicatorExternalState, oplogNSS, lastFetched.opTime), + stdx::bind( + &OplogFetcher::_callback, this, stdx::placeholders::_1, stdx::placeholders::_3), + uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL)), + config.getElectionTimeoutPeriod()), + _enqueueDocumentsFn(enqueueDocumentsFn), + _awaitDataTimeout(calculateAwaitDataTimeout(config)), + _onShutdownCallbackFn(onShutdownCallbackFn), + _lastFetched(lastFetched) { + uassert(ErrorCodes::BadValue, "null last optime fetched", !lastFetched.opTime.isNull()); + uassert(ErrorCodes::InvalidReplicaSetConfig, + "uninitialized replica set configuration", + config.isInitialized()); + uassert(ErrorCodes::BadValue, "null enqueueDocuments function", enqueueDocumentsFn); + uassert(ErrorCodes::BadValue, "null onShutdownCallback function", onShutdownCallbackFn); +} + +std::string OplogFetcher::toString() const { + return str::stream() << "OplogReader -" + << " last optime fetched: " << _lastFetched.opTime.toString() + << " last hash fetched: " << _lastFetched.value + << " fetcher: " << _fetcher.getDiagnosticString(); +} + +bool OplogFetcher::isActive() const { + return _fetcher.isActive(); +} + +Status OplogFetcher::startup() { + return _fetcher.schedule(); +} + +void OplogFetcher::shutdown() { + _fetcher.cancel(); +} + +void OplogFetcher::join() { + _fetcher.wait(); +} + +OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _lastFetched; +} + +BSONObj OplogFetcher::getCommandObject_forTest() const { + return _fetcher.getCommandObject(); +} + +BSONObj OplogFetcher::getMetadataObject_forTest() const { + return _fetcher.getMetadataObject(); +} + +Milliseconds OplogFetcher::getRemoteCommandTimeout_forTest() const { + return _fetcher.getTimeout(); +} + +Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const { + return _awaitDataTimeout; +} + +void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, + BSONObjBuilder* getMoreBob) { + // if target cut connections between connecting and querying (for + // example, because it stepped down) we might not have a cursor + if (!result.isOK()) { + LOG(2) << "Error returned from oplog query: " << result.getStatus(); + _onShutdown(result.getStatus()); + return; + } + + const auto& queryResponse = result.getValue(); + OpTime sourcesLastOpTime; + bool syncSourceHasSyncSource = false; + + // Forward metadata (containing liveness information) to data replicator external state. + bool receivedMetadata = + queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); + if (receivedMetadata) { + const auto& metadataObj = queryResponse.otherFields.metadata; + auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj); + if (!metadataResult.isOK()) { + error() << "invalid replication metadata from sync source " << _fetcher.getSource() + << ": " << metadataResult.getStatus() << ": " << metadataObj; + _onShutdown(metadataResult.getStatus()); + return; + } + auto metadata = metadataResult.getValue(); + _dataReplicatorExternalState->processMetadata(metadata); + sourcesLastOpTime = metadata.getLastOpVisible(); + syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; + } + + const auto& documents = queryResponse.documents; + auto firstDocToApply = documents.cbegin(); + + if (!documents.empty()) { + LOG(2) << "oplog fetcher read " << documents.size() + << " operations from remote oplog starting at " << documents.front()["ts"] + << " and ending at " << documents.back()["ts"]; + } else { + LOG(2) << "oplog fetcher read 0 operations from remote oplog"; + } + + auto opTimeWithHash = getLastOpTimeWithHashFetched(); + + // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. + if (queryResponse.first) { + auto status = checkRemoteOplogStart(documents, opTimeWithHash); + if (!status.isOK()) { + // Stop oplog fetcher and execute rollback. + _onShutdown(status, opTimeWithHash); + return; + } + + // If this is the first batch and no rollback is needed, skip the first document. + firstDocToApply++; + } + + auto validateResult = OplogFetcher::validateDocuments( + documents, queryResponse.first, opTimeWithHash.opTime.getTimestamp()); + if (!validateResult.isOK()) { + _onShutdown(validateResult.getStatus(), opTimeWithHash); + return; + } + auto info = validateResult.getValue(); + + // TODO: back pressure handling will be added in SERVER-23499. + _enqueueDocumentsFn(firstDocToApply, documents.cend(), info, queryResponse.elapsedMillis); + + // Update last fetched info. + if (firstDocToApply != documents.cend()) { + opTimeWithHash = info.lastDocument; + LOG(3) << "batch resetting last fetched optime: " << opTimeWithHash.opTime + << "; hash: " << opTimeWithHash.value; + + stdx::unique_lock<stdx::mutex> lock(_mutex); + _lastFetched = opTimeWithHash; + } + + if (_dataReplicatorExternalState->shouldStopFetching( + _fetcher.getSource(), sourcesLastOpTime, syncSourceHasSyncSource)) { + _onShutdown(Status(ErrorCodes::InvalidSyncSource, + str::stream() << "sync source " << _fetcher.getSource().toString() + << " (last optime: " << sourcesLastOpTime.toString() + << "; has sync source: " << syncSourceHasSyncSource + << ") is no longer valid"), + opTimeWithHash); + return; + } + + // No more data. Stop processing and return Status::OK along with last + // fetch info. + if (!getMoreBob) { + _onShutdown(Status::OK(), opTimeWithHash); + return; + } + + getMoreBob->appendElements(makeGetMoreCommandObject(_dataReplicatorExternalState, + queryResponse.nss, + queryResponse.cursorId, + _awaitDataTimeout)); +} + +void OplogFetcher::_onShutdown(Status status) { + _onShutdown(status, getLastOpTimeWithHashFetched()); +} + +void OplogFetcher::_onShutdown(Status status, OpTimeWithHash opTimeWithHash) { + _onShutdownCallbackFn(status, opTimeWithHash); +} + + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h new file mode 100644 index 00000000000..c6209461161 --- /dev/null +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -0,0 +1,221 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/timestamp.h" +#include "mongo/client/query_fetcher.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/data_replicator_external_state.h" +#include "mongo/db/repl/optime_with.h" +#include "mongo/db/repl/replica_set_config.h" +#include "mongo/stdx/functional.h" + +namespace mongo { +namespace repl { + +/** + * Used to keep track of the optime and hash of the last fetched operation. + */ +using OpTimeWithHash = OpTimeWith<long long>; + +/** + * The oplog fetcher, once started, reads operations from a remote oplog using a tailable cursor. + * + * The initial find command is generated from last fetched optime and hash and may contain the + * current term depending on the replica set config provided. + * + * Forwards metadata in each find/getMore response to the data replicator external state. + * + * Performs additional validation on first batch of operations returned from the query to ensure we + * are able to continue from our last known fetched operation. + * + * Validates each batch of operations. + * + * Pushes operations from each batch of operations onto a buffer using the "enqueueDocumentsFn" + * function. + * + * Issues a getMore command after successfully processing each batch of operations. + * + * When there is an error or when it is not possible to issue another getMore request, calls + * "onShutdownCallbackFn" to signal the end of processing. + */ +class OplogFetcher { + MONGO_DISALLOW_COPYING(OplogFetcher); + +public: + static Seconds kDefaultProtocolZeroAwaitDataTimeout; + + /** + * Type of function called by the oplog fetcher on shutdown with + * the final oplog fetcher status, last optime fetched and last hash fetched. + * + * The status will be Status::OK() if we have processed the last batch of operations + * from the tailable cursor ("bob" is null in the fetcher callback). + */ + using OnShutdownCallbackFn = + stdx::function<void(const Status& shutdownStatus, const OpTimeWithHash& lastFetched)>; + + /** + * Statistics on current batch of operations returned by the fetcher. + */ + struct DocumentsInfo { + size_t networkDocumentCount = 0; + size_t networkDocumentBytes = 0; + size_t toApplyDocumentCount = 0; + size_t toApplyDocumentBytes = 0; + OpTimeWithHash lastDocument = {0, OpTime()}; + }; + + /** + * Type of function that accepts a pair of iterators into a range of operations + * within the current batch of results and copies the operations into + * a buffer to be consumed by the next stage of the replication process. + * + * Additional information on the operations is provided in a DocumentsInfo + * struct and duration for how long the last remote command took to complete. + */ + using EnqueueDocumentsFn = stdx::function<void(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const DocumentsInfo& info, + Milliseconds remoteCommandProcessingTime)>; + + /** + * Validates documents in current batch of results returned from tailing the remote oplog. + * 'first' should be set to true if this set of documents is the first batch returned from the + * query. + * On success, returns statistics on operations. + */ + static StatusWith<DocumentsInfo> validateDocuments(const Fetcher::Documents& documents, + bool first, + Timestamp lastTS); + + /** + * Initializes fetcher with command to tail remote oplog. + * + * Throws a UserException if validation fails on any of the provided arguments. + */ + OplogFetcher(executor::TaskExecutor* exec, + OpTimeWithHash lastFetched, + HostAndPort source, + NamespaceString nss, + ReplicaSetConfig config, + DataReplicatorExternalState* dataReplicatorExternalState, + EnqueueDocumentsFn enqueueDocumentsFn, + OnShutdownCallbackFn onShutdownCallbackFn); + + virtual ~OplogFetcher() = default; + + std::string toString() const; + + /** + * Returns true if we have scheduled the fetcher to read the oplog on the sync source. + */ + bool isActive() const; + + /** + * Starts fetcher so that we begin tailing the remote oplog on the sync source. + */ + Status startup(); + + /** + * Cancels both scheduled and active remote command requests. + * Returns immediately if the Oplog Fetcher is not active. + * It is fine to call this multiple times. + */ + void shutdown(); + + /** + * Waits until the oplog fetcher is inactive. + * It is fine to call this multiple times. + */ + void join(); + + /** + * Returns optime and hash of the last oplog entry in the most recent oplog query result. + */ + OpTimeWithHash getLastOpTimeWithHashFetched() const; + + // ================== Test support API =================== + + /** + * Returns command object sent in first remote command. + */ + BSONObj getCommandObject_forTest() const; + + /** + * Returns metadata object sent in remote commands. + */ + BSONObj getMetadataObject_forTest() const; + + /** + * Returns timeout for remote commands to complete. + */ + Milliseconds getRemoteCommandTimeout_forTest() const; + + /** + * Returns the await data timeout used for the "maxTimeMS" field in getMore command requests. + */ + Milliseconds getAwaitDataTimeout_forTest() const; + +private: + /** + * Processes each batch of results from the tailable cursor started by the fetcher on the sync + * source. + * + * Calls "onShutdownCallbackFn" if there is an error or if there are no further results to + * request from the sync source. + */ + void _callback(const Fetcher::QueryResponseStatus& result, BSONObjBuilder* getMoreBob); + + /** + * Notifies caller that the oplog fetcher has completed processing operations from + * the remote oplog. + */ + void _onShutdown(Status status); + void _onShutdown(Status status, OpTimeWithHash opTimeWithHash); + + DataReplicatorExternalState* _dataReplicatorExternalState; + Fetcher _fetcher; + const EnqueueDocumentsFn _enqueueDocumentsFn; + const Milliseconds _awaitDataTimeout; + const OnShutdownCallbackFn _onShutdownCallbackFn; + + // Protects member data of this Fetcher. + mutable stdx::mutex _mutex; + + // Used to validate start of first batch of results from the remote oplog + // tailing query and to keep track of the last known operation consumed via + // "_enqueueDocumentsFn". + OpTimeWithHash _lastFetched; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp new file mode 100644 index 00000000000..378f212054a --- /dev/null +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -0,0 +1,875 @@ +/** + * Copyright 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/repl/data_replicator_external_state_mock.h" +#include "mongo/db/repl/oplog_fetcher.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/rpc/metadata.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" + +namespace { + +using namespace mongo; +using namespace mongo::repl; + +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; + +class ShutdownState { + MONGO_DISALLOW_COPYING(ShutdownState); + +public: + ShutdownState(); + + Status getStatus() const; + OpTimeWithHash getLastFetched() const; + + /** + * Use this for oplog fetcher shutdown callback. + */ + void operator()(const Status& status, const OpTimeWithHash& lastFetched); + +private: + Status _status = executor::TaskExecutorTest::getDetectableErrorStatus(); + OpTimeWithHash _lastFetched = {0, OpTime()}; +}; + +class OplogFetcherTest : public executor::ThreadPoolExecutorTest { +protected: + void setUp() override; + void tearDown() override; + + /** + * Schedules response to the current network request. + * Returns remote command request in network request. + */ + RemoteCommandRequest scheduleNetworkResponse(RemoteCommandResponse response); + + /** + * Schedules network response and instructs network interface to process response. + * Returns remote command request in network request. + */ + RemoteCommandRequest processNetworkResponse(RemoteCommandResponse response, + bool expectReadyRequestsAfterProcessing = false); + RemoteCommandRequest processNetworkResponse(BSONObj obj, + bool expectReadyRequestsAfterProcessing = false); + + /** + * Starts an oplog fetcher. Processes a single batch of results from + * the oplog query and shuts down. + * Returns shutdown state. + */ + std::unique_ptr<ShutdownState> processSingleBatch(RemoteCommandResponse response); + std::unique_ptr<ShutdownState> processSingleBatch(BSONObj obj); + + /** + * Tests checkSyncSource result handling. + */ + void testSyncSourceChecking(rpc::ReplSetMetadata* metadata); + + /** + * Tests handling of two batches of operations returned from query. + * Returns getMore request. + */ + RemoteCommandRequest testTwoBatchHandling(bool isV1ElectionProtocol); + + OpTimeWithHash lastFetched; + + std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState; + + Fetcher::Documents lastEnqueuedDocuments; + OplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo; + Milliseconds lastEnqueuedElapsed; + OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn; +}; + +ShutdownState::ShutdownState() = default; + +Status ShutdownState::getStatus() const { + return _status; +} + +OpTimeWithHash ShutdownState::getLastFetched() const { + return _lastFetched; +} + +void ShutdownState::operator()(const Status& status, const OpTimeWithHash& lastFetched) { + _status = status; + _lastFetched = lastFetched; +} + +void OplogFetcherTest::setUp() { + executor::ThreadPoolExecutorTest::setUp(); + launchExecutorThread(); + + lastFetched = {456LL, {{123, 0}, 1}}; + + dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>(); + dataReplicatorExternalState->currentTerm = lastFetched.opTime.getTerm(); + dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.opTime.getTerm()}; + + enqueueDocumentsFn = [this](Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info, + Milliseconds elapsed) { + lastEnqueuedDocuments = {begin, end}; + lastEnqueuedDocumentsInfo = info; + lastEnqueuedElapsed = elapsed; + }; +} + +void OplogFetcherTest::tearDown() { + executor::ThreadPoolExecutorTest::tearDown(); +} + +RemoteCommandRequest OplogFetcherTest::scheduleNetworkResponse(RemoteCommandResponse response) { + auto net = getNet(); + ASSERT_TRUE(net->hasReadyRequests()); + Milliseconds millis(0); + executor::TaskExecutor::ResponseStatus responseStatus(response); + auto noi = net->getNextReadyRequest(); + net->scheduleResponse(noi, net->now(), responseStatus); + return noi->getRequest(); +} + +RemoteCommandRequest OplogFetcherTest::processNetworkResponse( + RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing) { + auto net = getNet(); + net->enterNetwork(); + auto request = scheduleNetworkResponse(response); + net->runReadyNetworkOperations(); + ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests()); + net->exitNetwork(); + return request; +} + +RemoteCommandRequest OplogFetcherTest::processNetworkResponse( + BSONObj obj, bool expectReadyRequestsAfterProcessing) { + auto net = getNet(); + net->enterNetwork(); + auto request = scheduleNetworkResponse({obj, rpc::makeEmptyMetadata(), Milliseconds(0)}); + net->runReadyNetworkOperations(); + ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests()); + net->exitNetwork(); + return request; +} + +HostAndPort source("localhost:12345"); +NamespaceString nss("local.oplog.rs"); + +ReplicaSetConfig _createConfig(bool isV1ElectionProtocol) { + BSONObjBuilder bob; + bob.append("_id", "myset"); + bob.append("version", 1); + if (isV1ElectionProtocol) { + bob.append("protocolVersion", 1); + } + { + BSONArrayBuilder membersBob(bob.subarrayStart("members")); + BSONObjBuilder(membersBob.subobjStart()) + .appendElements(BSON("_id" << 0 << "host" << source.toString())); + } + { + BSONObjBuilder settingsBob(bob.subobjStart("settings")); + settingsBob.append("electionTimeoutMillis", 10000); + } + auto configObj = bob.obj(); + + ReplicaSetConfig config; + ASSERT_OK(config.initialize(configObj)); + return config; +} + +std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch( + RemoteCommandResponse response) { + auto shutdownState = stdx::make_unique<ShutdownState>(); + + OplogFetcher oplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + stdx::ref(*shutdownState)); + + ASSERT_FALSE(oplogFetcher.isActive()); + ASSERT_OK(oplogFetcher.startup()); + ASSERT_TRUE(oplogFetcher.isActive()); + + auto request = processNetworkResponse(response); + + ASSERT_EQUALS(oplogFetcher.getCommandObject_forTest(), request.cmdObj); + ASSERT_EQUALS(oplogFetcher.getMetadataObject_forTest(), request.metadata); + + oplogFetcher.shutdown(); + oplogFetcher.join(); + + return std::move(shutdownState); +} + +std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj) { + return processSingleBatch({obj, rpc::makeEmptyMetadata(), Milliseconds(0)}); +} + +TEST_F(OplogFetcherTest, InvalidConstruction) { + // Null start timestamp. + ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(), + OpTimeWithHash(), + source, + nss, + _createConfig(true), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}), + UserException, + ErrorCodes::BadValue, + "null last optime fetched"); + + // Null EnqueueDocumentsFn. + ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + dataReplicatorExternalState.get(), + OplogFetcher::EnqueueDocumentsFn(), + [](Status, OpTimeWithHash) {}), + UserException, + ErrorCodes::BadValue, + "null enqueueDocuments function"); + + // Uninitialized replica set configuration. + ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + ReplicaSetConfig(), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}), + UserException, + ErrorCodes::InvalidReplicaSetConfig, + "uninitialized replica set configuration"); + + // Null OnShutdownCallbackFn. + ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + OplogFetcher::OnShutdownCallbackFn()), + UserException, + ErrorCodes::BadValue, + "null onShutdownCallback function"); +} + +void _checkDefaultCommandObjectFields(BSONObj cmdObj) { + ASSERT_EQUALS(std::string("find"), cmdObj.firstElementFieldName()); + ASSERT_TRUE(cmdObj.getBoolField("tailable")); + ASSERT_TRUE(cmdObj.getBoolField("oplogReplay")); + ASSERT_TRUE(cmdObj.getBoolField("awaitData")); + ASSERT_EQUALS(60000, cmdObj.getIntField("maxTimeMS")); +} + +TEST_F( + OplogFetcherTest, + CommandObjectContainsTermAndStartTimestampIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) { + auto cmdObj = OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}).getCommandObject_forTest(); + ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type()); + ASSERT_EQUALS(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())), + cmdObj["filter"].Obj()); + ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, cmdObj["term"].numberLong()); + _checkDefaultCommandObjectFields(cmdObj); +} + +TEST_F( + OplogFetcherTest, + CommandObjectContainsDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) { + dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; + auto cmdObj = OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}).getCommandObject_forTest(); + ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type()); + ASSERT_EQUALS(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())), + cmdObj["filter"].Obj()); + ASSERT_FALSE(cmdObj.hasField("term")); + _checkDefaultCommandObjectFields(cmdObj); +} + +TEST_F(OplogFetcherTest, MetadataObjectContainsReplSetMetadataFieldUnderProtocolVersion1) { + auto metadataObj = OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}).getMetadataObject_forTest(); + ASSERT_EQUALS(1, metadataObj.nFields()); + ASSERT_EQUALS(1, metadataObj[rpc::kReplSetMetadataFieldName].numberInt()); +} + +TEST_F(OplogFetcherTest, MetadataObjectIsEmptyUnderProtocolVersion0) { + auto metadataObj = OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(false), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}).getMetadataObject_forTest(); + ASSERT_EQUALS(BSONObj(), metadataObj); +} + +TEST_F(OplogFetcherTest, RemoteCommandTimeoutShouldEqualElectionTimeout) { + auto config = _createConfig(true); + auto timeout = OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + config, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}).getRemoteCommandTimeout_forTest(); + ASSERT_EQUALS(config.getElectionTimeoutPeriod(), timeout); +} + +TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProtocolVersion1) { + auto config = _createConfig(true); + auto timeout = OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + config, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}).getAwaitDataTimeout_forTest(); + ASSERT_EQUALS(config.getElectionTimeoutPeriod() / 2, timeout); +} + +TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldBeAConstantUnderProtocolVersion0) { + auto timeout = OplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(false), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}).getAwaitDataTimeout_forTest(); + ASSERT_EQUALS(OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout, timeout); +} + +TEST_F(OplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) { + getExecutor().shutdown(); + + OplogFetcher oplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}); + + // Last optime and hash fetched should match values passed to constructor. + ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched()); + + ASSERT_FALSE(oplogFetcher.isActive()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher.startup()); + ASSERT_FALSE(oplogFetcher.isActive()); + + // Last optime and hash fetched should not change. + ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched()); +} + +TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupStopsTheOplogFetcher) { + ShutdownState shutdownState; + + OplogFetcher oplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + stdx::ref(shutdownState)); + + ASSERT_FALSE(oplogFetcher.isActive()); + ASSERT_OK(oplogFetcher.startup()); + ASSERT_TRUE(oplogFetcher.isActive()); + + getExecutor().shutdown(); + + oplogFetcher.join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); + ASSERT_EQUALS(lastFetched, shutdownState.getLastFetched()); +} + +BSONObj makeNoopOplogEntry(OpTimeWithHash opTimeWithHash) { + BSONObjBuilder bob; + bob.appendElements(opTimeWithHash.opTime.toBSON()); + bob.append("h", opTimeWithHash.value); + bob.append("op", "c"); + bob.append("ns", "test.t"); + return bob.obj(); +} + +BSONObj makeNoopOplogEntry(OpTime opTime, long long hash) { + return makeNoopOplogEntry({hash, opTime}); +} + +BSONObj makeNoopOplogEntry(Seconds seconds, long long hash) { + return makeNoopOplogEntry({{seconds, 0}, 1LL}, hash); +} + +BSONObj makeCursorResponse(CursorId cursorId, + Fetcher::Documents oplogEntries, + bool isFirstBatch = true) { + BSONObjBuilder bob; + { + BSONObjBuilder cursorBob(bob.subobjStart("cursor")); + cursorBob.append("id", cursorId); + cursorBob.append("ns", nss.toString()); + { + BSONArrayBuilder batchBob( + cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch")); + for (auto oplogEntry : oplogEntries) { + batchBob.append(oplogEntry); + } + } + } + bob.append("ok", 1); + return bob.obj(); +} + +TEST_F(OplogFetcherTest, InvalidMetadataInResponseStopsTheOplogFetcher) { + auto shutdownState = processSingleBatch( + {makeCursorResponse(0, {}), + BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1)), + Milliseconds(0)}); + + ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus()); +} + +TEST_F(OplogFetcherTest, VaidMetadataInResponseShouldBeForwardedToProcessMetadataFn) { + rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2); + BSONObjBuilder bob; + ASSERT_OK(metadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + processSingleBatch( + {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj, Milliseconds(0)}); + ASSERT_EQUALS(metadata.getPrimaryIndex(), + dataReplicatorExternalState->metadataProcessed.getPrimaryIndex()); +} + +TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithRemoteOplogStaleError) { + ASSERT_EQUALS(ErrorCodes::RemoteOplogStale, + processSingleBatch(makeCursorResponse(0, {}))->getStatus()); +} + +TEST_F(OplogFetcherTest, + MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { + ASSERT_EQUALS(ErrorCodes::OplogStartMissing, + processSingleBatch(makeCursorResponse(0, {BSONObj()}))->getStatus()); +} + +TEST_F( + OplogFetcherTest, + LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { + ASSERT_EQUALS(ErrorCodes::OplogStartMissing, + processSingleBatch( + makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)})) + ->getStatus()); +} + +TEST_F(OplogFetcherTest, + LastHashFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { + ASSERT_EQUALS( + ErrorCodes::OplogStartMissing, + processSingleBatch( + makeCursorResponse(0, {makeNoopOplogEntry(lastFetched.opTime, lastFetched.value + 1)})) + ->getStatus()); +} + +TEST_F(OplogFetcherTest, + MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) { + ASSERT_EQUALS( + ErrorCodes::NoSuchKey, + processSingleBatch(makeCursorResponse(0, + {makeNoopOplogEntry(lastFetched), + BSON("o" << BSON("msg" + << "oplog entry without optime"))})) + ->getStatus()); +} + +TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) { + ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, + processSingleBatch(makeCursorResponse(0, + {makeNoopOplogEntry(lastFetched), + makeNoopOplogEntry(Seconds(1000), 1), + makeNoopOplogEntry(Seconds(2000), 1), + makeNoopOplogEntry(Seconds(1500), 1)})) + ->getStatus()); +} + +TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) { + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); + Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; + + Milliseconds elapsed(600); + auto shutdownState = + processSingleBatch({makeCursorResponse(0, documents), rpc::makeEmptyMetadata(), elapsed}); + + ASSERT_EQUALS(2U, lastEnqueuedDocuments.size()); + ASSERT_EQUALS(secondEntry, lastEnqueuedDocuments[0]); + ASSERT_EQUALS(thirdEntry, lastEnqueuedDocuments[1]); + + ASSERT_EQUALS(3U, lastEnqueuedDocumentsInfo.networkDocumentCount); + ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), + lastEnqueuedDocumentsInfo.networkDocumentBytes); + + ASSERT_EQUALS(2U, lastEnqueuedDocumentsInfo.toApplyDocumentCount); + ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()), + lastEnqueuedDocumentsInfo.toApplyDocumentBytes); + + ASSERT_EQUALS(thirdEntry["h"].numberLong(), lastEnqueuedDocumentsInfo.lastDocument.value); + ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), + lastEnqueuedDocumentsInfo.lastDocument.opTime); + + ASSERT_EQUALS(elapsed, lastEnqueuedElapsed); + + // The last fetched optime and hash should be updated after pushing the operations into the + // buffer and reflected in the shutdown callback arguments. + ASSERT_OK(shutdownState->getStatus()); + ASSERT_EQUALS(OpTimeWithHash(thirdEntry["h"].numberLong(), + unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry))), + shutdownState->getLastFetched()); +} + +void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) { + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); + Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; + + BSONObj metadataObj; + if (metadata) { + BSONObjBuilder bob; + ASSERT_OK(metadata->writeToMetadata(&bob)); + metadataObj = bob.obj(); + } + + dataReplicatorExternalState->shouldStopFetchingResult = true; + + auto shutdownState = + processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)}); + + // Sync source checking happens after we have successfully pushed the operations into + // the buffer for the next replication phase (eg. applier). + // The last fetched optime and hash should be reflected in the shutdown callback + // arguments. + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus()); + ASSERT_EQUALS(OpTimeWithHash(thirdEntry["h"].numberLong(), + unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry))), + shutdownState->getLastFetched()); +} + +TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) { + testSyncSourceChecking(nullptr); + + // Sync source optime and "hasSyncSource" are not available if the respone does not + // contain metadata. + ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); + ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime); + ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); +} + +TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithMetadataStopsTheOplogFetcher) { + rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(), + {{Seconds(10000), 0}, 1}, + {{Seconds(20000), 0}, 1}, + 1, + OID::gen(), + 2, + 2); + + testSyncSourceChecking(&metadata); + + // Sync source optime and "hasSyncSource" can be set if the respone contains metadata. + ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); + ASSERT_EQUALS(metadata.getLastOpVisible(), dataReplicatorExternalState->syncSourceLastOpTime); + ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource); +} + +TEST_F(OplogFetcherTest, + FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) { + rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(), + {{Seconds(10000), 0}, 1}, + {{Seconds(20000), 0}, 1}, + 1, + OID::gen(), + 2, + -1); + + testSyncSourceChecking(&metadata); + + // Sync source "hasSyncSource" is derived from metadata. + ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); + ASSERT_EQUALS(metadata.getLastOpVisible(), dataReplicatorExternalState->syncSourceLastOpTime); + ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); +} + + +RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionProtocol) { + ShutdownState shutdownState; + + if (!isV1ElectionProtocol) { + dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; + } + + OplogFetcher oplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(isV1ElectionProtocol), + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + stdx::ref(shutdownState)); + + ASSERT_OK(oplogFetcher.startup()); + + CursorId cursorId = 22LL; + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); + processNetworkResponse(makeCursorResponse(cursorId, {firstEntry, secondEntry}), true); + + ASSERT_EQUALS(1U, lastEnqueuedDocuments.size()); + ASSERT_EQUALS(secondEntry, lastEnqueuedDocuments[0]); + + // Set cursor ID to 0 in getMore response to indicate no more data available. + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); + auto fourthEntry = makeNoopOplogEntry({{Seconds(1200), 0}, lastFetched.opTime.getTerm()}, 300); + auto request = processNetworkResponse(makeCursorResponse(0, {thirdEntry, fourthEntry}, false)); + + ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName()); + ASSERT_EQUALS(nss.coll(), request.cmdObj["collection"].String()); + ASSERT_EQUALS(int(durationCount<Milliseconds>(oplogFetcher.getAwaitDataTimeout_forTest())), + request.cmdObj.getIntField("maxTimeMS")); + + ASSERT_EQUALS(2U, lastEnqueuedDocuments.size()); + ASSERT_EQUALS(thirdEntry, lastEnqueuedDocuments[0]); + ASSERT_EQUALS(fourthEntry, lastEnqueuedDocuments[1]); + + oplogFetcher.shutdown(); + oplogFetcher.join(); + + ASSERT_OK(shutdownState.getStatus()); + ASSERT_EQUALS(OpTimeWithHash(fourthEntry["h"].numberLong(), + unittest::assertGet(OpTime::parseFromOplogEntry(fourthEntry))), + shutdownState.getLastFetched()); + + return request; +} + +TEST_F( + OplogFetcherTest, + NoDataAvailableAfterFirstTwoBatchesShouldCauseTheOplogFetcherToShutDownWithSuccessfulStatus) { + auto request = testTwoBatchHandling(true); + ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, request.cmdObj["term"].numberLong()); + ASSERT_EQUALS(dataReplicatorExternalState->lastCommittedOpTime, + unittest::assertGet(OpTime::parseFromOplogEntry( + request.cmdObj["lastKnownCommittedOpTime"].Obj()))); +} + +TEST_F(OplogFetcherTest, + GetMoreRequestUnderProtocolVersionZeroDoesNotIncludeTermOrLastKnownCommittedOpTime) { + auto request = testTwoBatchHandling(false); + ASSERT_FALSE(request.cmdObj.hasField("term")); + ASSERT_FALSE(request.cmdObj.hasField("lastKnownCommittedOpTime")); +} + +TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) { + auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); + auto secondEntry = BSON("o" << BSON("msg" + << "oplog entry without optime")); + + ASSERT_EQUALS(ErrorCodes::NoSuchKey, + OplogFetcher::validateDocuments( + {firstEntry, secondEntry}, + true, + unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) + .getStatus()); +} + +TEST_F( + OplogFetcherTest, + ValidateDocumentsReturnsOutOfOrderIfTimestampInFirstEntryIsEqualToLastTimestampAndNotProcessingFirstBatch) { + auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); + auto secondEntry = makeNoopOplogEntry(Seconds(456), 200); + + ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, + OplogFetcher::validateDocuments( + {firstEntry, secondEntry}, + false, + unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) + .getStatus()); +} + +TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) { + auto firstEntry = makeNoopOplogEntry(Seconds(456), 100); + auto secondEntry = makeNoopOplogEntry(Seconds(123), 200); + + ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, + OplogFetcher::validateDocuments( + {firstEntry, secondEntry}, + true, + unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) + .getStatus()); +} + +TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) { + auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); + auto secondEntry = makeNoopOplogEntry(Seconds(789), 200); + auto thirdEntry = makeNoopOplogEntry(Seconds(456), 300); + + ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, + OplogFetcher::validateDocuments( + {firstEntry, secondEntry, thirdEntry}, + true, + unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) + .getStatus()); +} + +TEST_F(OplogFetcherTest, + ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatch) { + auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); + auto secondEntry = makeNoopOplogEntry(Seconds(456), 200); + auto thirdEntry = makeNoopOplogEntry(Seconds(789), 300); + + auto info = unittest::assertGet(OplogFetcher::validateDocuments( + {firstEntry, secondEntry, thirdEntry}, + true, + unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())); + + ASSERT_EQUALS(3U, info.networkDocumentCount); + ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), + info.networkDocumentBytes); + + ASSERT_EQUALS(300LL, info.lastDocument.value); + ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), + info.lastDocument.opTime); +} + +TEST_F(OplogFetcherTest, + ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfNotProcessingFirstBatch) { + auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); + auto secondEntry = makeNoopOplogEntry(Seconds(456), 200); + auto thirdEntry = makeNoopOplogEntry(Seconds(789), 300); + + auto info = unittest::assertGet(OplogFetcher::validateDocuments( + {firstEntry, secondEntry, thirdEntry}, false, Timestamp(Seconds(100), 0))); + + ASSERT_EQUALS(3U, info.networkDocumentCount); + ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), + info.networkDocumentBytes); + + ASSERT_EQUALS(info.networkDocumentCount, info.toApplyDocumentCount); + ASSERT_EQUALS(info.networkDocumentBytes, info.toApplyDocumentBytes); + + ASSERT_EQUALS(300LL, info.lastDocument.value); + ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), + info.lastDocument.opTime); +} + +TEST_F(OplogFetcherTest, + ValidateDocumentsReturnsDefaultLastDocumentHashAndOpTimeWhenThereAreNoDocumentsToApply) { + auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); + + auto info = unittest::assertGet(OplogFetcher::validateDocuments( + {firstEntry}, + true, + unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())); + + ASSERT_EQUALS(1U, info.networkDocumentCount); + ASSERT_EQUALS(size_t(firstEntry.objsize()), info.networkDocumentBytes); + + ASSERT_EQUALS(0U, info.toApplyDocumentCount); + ASSERT_EQUALS(0U, info.toApplyDocumentBytes); + + ASSERT_EQUALS(0LL, info.lastDocument.value); + ASSERT_EQUALS(OpTime(), info.lastDocument.opTime); +} + +TEST_F(OplogFetcherTest, + ValidateDocumentsReturnsOplogStartMissingWhenThereAreNoDocumentsWhenProcessingFirstBatch) { + ASSERT_EQUALS( + ErrorCodes::OplogStartMissing, + OplogFetcher::validateDocuments({}, true, Timestamp(Seconds(100), 0)).getStatus()); +} + +TEST_F(OplogFetcherTest, + ValidateDocumentsReturnsDefaultInfoWhenThereAreNoDocumentsWhenNotProcessingFirstBatch) { + auto info = + unittest::assertGet(OplogFetcher::validateDocuments({}, false, Timestamp(Seconds(100), 0))); + + ASSERT_EQUALS(0U, info.networkDocumentCount); + ASSERT_EQUALS(0U, info.networkDocumentBytes); + + ASSERT_EQUALS(0U, info.toApplyDocumentCount); + ASSERT_EQUALS(0U, info.toApplyDocumentBytes); + + ASSERT_EQUALS(0LL, info.lastDocument.value); + ASSERT_EQUALS(OpTime(), info.lastDocument.opTime); +} + +} // namespace diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 23fd498cb52..068bb49ef51 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -41,6 +41,7 @@ #include "mongo/db/index/index_descriptor.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/check_quorum_for_config_change.h" +#include "mongo/db/repl/data_replicator_external_state_impl.h" #include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/election_winner_declarer.h" #include "mongo/db/repl/freshness_checker.h" @@ -253,7 +254,9 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _sleptLastElection(false), _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.isSlave())), _canServeNonLocalReads(0U), - _dr(createDataReplicatorOptions(this), &_replExecutor), + _dr(createDataReplicatorOptions(this), + stdx::make_unique<DataReplicatorExternalStateImpl>(this), + &_replExecutor), _isDurableStorageEngine(isDurableStorageEngineFn ? *isDurableStorageEngineFn : []() -> bool { return getGlobalServiceContext()->getGlobalStorageEngine()->isDurable(); }) { diff --git a/src/mongo/unittest/unittest.h b/src/mongo/unittest/unittest.h index 5ed906b353c..48ed89695d3 100644 --- a/src/mongo/unittest/unittest.h +++ b/src/mongo/unittest/unittest.h @@ -143,6 +143,19 @@ ([](const EXCEPTION_TYPE& ex) { return (EXPECTED_CODE) == ex.getCode(); })) /** + * Behaves like ASSERT_THROWS, above, but also fails if calling getCode() on the thrown exception + * does not return an error code equal to EXPECTED_CODE or if calling what() on the thrown exception + * does not return a string equal to EXPECTED_WHAT. + */ +#define ASSERT_THROWS_CODE_AND_WHAT(STATEMENT, EXCEPTION_TYPE, EXPECTED_CODE, EXPECTED_WHAT) \ + ASSERT_THROWS_PRED(STATEMENT, \ + EXCEPTION_TYPE, \ + ([](const EXCEPTION_TYPE& ex) { \ + return (EXPECTED_CODE) == ex.getCode() && \ + ::mongo::StringData(ex.what()) == ::mongo::StringData(EXPECTED_WHAT); \ + })) + +/** * Behaves like ASSERT_THROWS, above, but also fails if PREDICATE(ex) for the throw exception, ex, * is false. */ diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index 9f0870e9a82..a052cd1346e 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -36,15 +36,11 @@ #include "mongo/base/disallow_copying.h" #include "mongo/stdx/chrono.h" #include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" namespace mongo { -template <typename T> -size_t _getSizeDefault(const T& t) { - return 1; -} - /** * Simple blocking queue with optional max size (by count or custom sizing function). * A custom sizing function can optionally be given. By default the getSize function @@ -56,13 +52,13 @@ size_t _getSizeDefault(const T& t) { template <typename T> class BlockingQueue { MONGO_DISALLOW_COPYING(BlockingQueue); - typedef size_t (*getSizeFunc)(const T& t); public: - BlockingQueue() - : _maxSize(std::numeric_limits<std::size_t>::max()), _getSize(&_getSizeDefault) {} - BlockingQueue(size_t size) : _maxSize(size), _getSize(&_getSizeDefault) {} - BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _getSize(f) {} + using GetSizeFn = stdx::function<size_t(const T&)>; + + BlockingQueue() : BlockingQueue(std::numeric_limits<std::size_t>::max()) {} + BlockingQueue(size_t size) : BlockingQueue(size, [](const T&) { return 1; }) {} + BlockingQueue(size_t size, GetSizeFn f) : _maxSize(size), _getSize(f) {} void pushEvenIfFull(T const& t) { stdx::unique_lock<stdx::mutex> lk(_lock); @@ -82,21 +78,28 @@ public: * * NOTE: Should only be used in a single producer case. */ - void pushAllNonBlocking(std::vector<T>& objs) { - if (objs.empty()) { + template <typename Container> + void pushAllNonBlocking(const Container& objs) { + pushAllNonBlocking(std::begin(objs), std::end(objs)); + } + + template <typename Iterator> + void pushAllNonBlocking(Iterator begin, Iterator end) { + if (begin == end) { return; } stdx::unique_lock<stdx::mutex> lk(_lock); const auto startedEmpty = _queue.empty(); _clearing = false; - std::for_each(objs.begin(), - objs.end(), - [this](T& obj) { - size_t tSize = _getSize(obj); - _queue.push(obj); - _currentSize += tSize; - }); + + auto pushOne = [this](const T& obj) { + size_t tSize = _getSize(obj); + _queue.push(obj); + _currentSize += tSize; + }; + std::for_each(begin, end, pushOne); + if (startedEmpty) { _cvNoLongerEmpty.notify_one(); } @@ -269,7 +272,7 @@ private: std::queue<T> _queue; const size_t _maxSize; size_t _currentSize = 0; - getSizeFunc _getSize; + GetSizeFn _getSize; bool _clearing = false; stdx::condition_variable _cvNoLongerFull; |