summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-03-30 12:04:23 -0400
committerBenety Goh <benety@mongodb.com>2016-04-08 10:27:51 -0400
commit11132f69c93a279ebe7f0ccb71929d4cd2b8675d (patch)
tree21eebd96010805a9cacffb0e02b7ac8befba90a6 /src/mongo/db
parent12d251318b76936c9655f317fd29ce46cb5e862b (diff)
downloadmongo-11132f69c93a279ebe7f0ccb71929d4cd2b8675d.tar.gz
SERVER-22774 Copied BackgroundSync::_fetcherCallback logic to OplogFetcher
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript50
-rw-r--r--src/mongo/db/repl/data_replicator.cpp235
-rw-r--r--src/mongo/db/repl/data_replicator.h25
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h82
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp78
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h64
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp54
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h70
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp65
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp384
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h221
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp875
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp5
13 files changed, 2044 insertions, 164 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 11a30e6852d..27fd72ad763 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -324,6 +324,7 @@ env.Library('repl_coordinator_impl',
'$BUILD_DIR/mongo/rpc/metadata',
'$BUILD_DIR/mongo/util/fail_point',
'data_replicator',
+ 'data_replicator_external_state_impl',
'repl_coordinator_global',
'repl_coordinator_interface',
'repl_settings',
@@ -558,6 +559,31 @@ env.Library(
)
env.Library(
+ target='oplog_fetcher',
+ source=[
+ 'oplog_fetcher.cpp',
+ ],
+ LIBDEPS=[
+ 'repl_coordinator_interface',
+ 'replica_set_messages',
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/fetcher',
+ '$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
+ ],
+)
+
+env.CppUnitTest(
+ target='oplog_fetcher_test',
+ source='oplog_fetcher_test.cpp',
+ LIBDEPS=[
+ 'oplog_fetcher',
+ 'data_replicator_external_state_mock',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ ],
+)
+
+env.Library(
target='reporter',
source=[
'reporter.cpp',
@@ -763,6 +789,28 @@ env.CppUnitTest(
)
env.Library(
+ target='data_replicator_external_state_impl',
+ source=[
+ 'data_replicator_external_state_impl.cpp',
+ ],
+ LIBDEPS=[
+ 'optime',
+ 'repl_coordinator_interface',
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+env.Library(
+ target='data_replicator_external_state_mock',
+ source=[
+ 'data_replicator_external_state_mock.cpp',
+ ],
+ LIBDEPS=[
+ 'optime',
+ ],
+)
+
+env.Library(
target='data_replicator',
source=[
'data_replicator.cpp',
@@ -772,6 +820,7 @@ env.Library(
'collection_cloner',
'database_cloner',
'multiapplier',
+ 'oplog_fetcher',
'optime',
'reporter',
'$BUILD_DIR/mongo/client/fetcher',
@@ -786,6 +835,7 @@ env.CppUnitTest(
LIBDEPS=[
'base_cloner_test_fixture',
'data_replicator',
+ 'data_replicator_external_state_mock',
'replication_executor_test_fixture',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/unittest/concurrency',
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 660d53e1567..734d8362235 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -42,11 +42,13 @@
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
#include "mongo/db/repl/member_state.h"
+#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point_service.h"
@@ -94,102 +96,6 @@ std::string toString(DataReplicatorState s) {
MONGO_UNREACHABLE;
}
-/**
- * Follows the fetcher pattern for a find+getmore on an oplog
- * Returns additional errors if the start oplog entry cannot be found.
- */
-class OplogFetcher : public QueryFetcher {
- MONGO_DISALLOW_COPYING(OplogFetcher);
-
-public:
- OplogFetcher(ReplicationExecutor* exec,
- const Timestamp& startTS,
- const HostAndPort& src,
- const NamespaceString& nss,
- const QueryFetcher::CallbackFn& work);
-
- virtual ~OplogFetcher() = default;
- std::string toString() const;
-
- const Timestamp getStartTimestamp() const {
- return _startTS;
- }
-
-protected:
- void _delegateCallback(const Fetcher::QueryResponseStatus& fetchResult, NextAction* nextAction);
-
- const Timestamp _startTS;
-};
-
-// OplogFetcher
-OplogFetcher::OplogFetcher(ReplicationExecutor* exec,
- const Timestamp& startTS,
- const HostAndPort& src,
- const NamespaceString& oplogNSS,
- const QueryFetcher::CallbackFn& work)
- // TODO: add query options await_data, oplog_replay
- : QueryFetcher(exec,
- src,
- oplogNSS,
- BSON("find" << oplogNSS.coll() << "filter"
- << BSON("ts" << BSON("$gte" << startTS))),
- work,
- BSON(rpc::kReplSetMetadataFieldName << 1)),
- _startTS(startTS) {}
-
-std::string OplogFetcher::toString() const {
- return str::stream() << "OplogReader -"
- << " startTS: " << _startTS.toString()
- << " fetcher: " << QueryFetcher::getDiagnosticString();
-}
-
-void OplogFetcher::_delegateCallback(const Fetcher::QueryResponseStatus& fetchResult,
- Fetcher::NextAction* nextAction) {
- if (fetchResult.isOK()) {
- Fetcher::Documents::const_iterator firstDoc = fetchResult.getValue().documents.begin();
- auto hasDoc = firstDoc != fetchResult.getValue().documents.end();
-
- if (fetchResult.getValue().first) {
- if (!hasDoc) {
- // Set next action to none.
- *nextAction = Fetcher::NextAction::kNoAction;
- _onQueryResponse(
- Status(ErrorCodes::OplogStartMissing,
- str::stream()
- << "No operations on sync source with op time starting at: "
- << _startTS.toString()),
- nextAction);
- return;
- } else if ((*firstDoc)["ts"].eoo()) {
- // Set next action to none.
- *nextAction = Fetcher::NextAction::kNoAction;
- _onQueryResponse(Status(ErrorCodes::OplogStartMissing,
- str::stream() << "Missing 'ts' field in first returned "
- << (*firstDoc)["ts"] << " starting at "
- << _startTS.toString()),
- nextAction);
- return;
- } else if ((*firstDoc)["ts"].timestamp() != _startTS) {
- // Set next action to none.
- *nextAction = Fetcher::NextAction::kNoAction;
- _onQueryResponse(Status(ErrorCodes::OplogStartMissing,
- str::stream() << "First returned " << (*firstDoc)["ts"]
- << " is not where we wanted to start: "
- << _startTS.toString()),
- nextAction);
- return;
- }
- }
-
- if (hasDoc) {
- _onQueryResponse(fetchResult, nextAction);
- } else {
- }
- } else {
- _onQueryResponse(fetchResult, nextAction);
- }
-};
-
class DatabasesCloner {
public:
DatabasesCloner(ReplicationExecutor* exec,
@@ -511,8 +417,12 @@ void DatabasesCloner::_failed() {
}
// Data Replicator
-DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec)
+DataReplicator::DataReplicator(
+ DataReplicatorOptions opts,
+ std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState,
+ ReplicationExecutor* exec)
: _opts(opts),
+ _dataReplicatorExternalState(std::move(dataReplicatorExternalState)),
_exec(exec),
_state(DataReplicatorState::Uninitialized),
_fetcherPaused(false),
@@ -533,7 +443,7 @@ DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor*
}
DataReplicator::~DataReplicator() {
- DESTRUCTOR_GUARD(_cancelAllHandles_inlock(); _waitOnAll_inlock(););
+ DESTRUCTOR_GUARD(_cancelAllHandles_inlock(); _oplogBuffer.clear(); _waitOnAll_inlock(););
}
Status DataReplicator::start() {
@@ -767,14 +677,27 @@ TimestampStatus DataReplicator::initialSync() {
attemptErrorStatus = tsStatus.getStatus();
if (attemptErrorStatus.isOK()) {
_initialSyncState->beginTimestamp = tsStatus.getValue();
- _fetcher.reset(new OplogFetcher(_exec,
- _initialSyncState->beginTimestamp,
- _syncSource,
- _opts.remoteOplogNS,
- stdx::bind(&DataReplicator::_onOplogFetchFinish,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2)));
+ long long term = OpTime::kUninitializedTerm;
+ // TODO: Read last fetched hash from storage.
+ long long lastHashFetched = 1LL;
+ OpTime lastOpTimeFetched(_initialSyncState->beginTimestamp, term);
+ _fetcher = stdx::make_unique<OplogFetcher>(
+ _exec,
+ OpTimeWithHash(lastHashFetched, lastOpTimeFetched),
+ _syncSource,
+ _opts.remoteOplogNS,
+ _opts.getReplSetConfig(),
+ _dataReplicatorExternalState.get(),
+ stdx::bind(&DataReplicator::_enqueueDocuments,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3,
+ stdx::placeholders::_4),
+ stdx::bind(&DataReplicator::_onOplogFetchFinish,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2));
_scheduleFetch_inlock();
lk.unlock();
_initialSyncState->dbsCloner.start(); // When the cloner is done applier starts.
@@ -888,7 +811,7 @@ bool DataReplicator::_anyActiveHandles_inlock() const {
void DataReplicator::_cancelAllHandles_inlock() {
if (_fetcher)
- _fetcher->cancel();
+ _fetcher->shutdown();
if (_applier)
_applier->cancel();
if (_reporter)
@@ -899,7 +822,7 @@ void DataReplicator::_cancelAllHandles_inlock() {
void DataReplicator::_waitOnAll_inlock() {
if (_fetcher)
- _fetcher->wait();
+ _fetcher->join();
if (_applier)
_applier->wait();
if (_reporter)
@@ -1102,6 +1025,10 @@ void DataReplicator::_onApplyBatchFinish(const CallbackArgs& cbData,
const TimestampStatus& ts,
const Operations& ops,
const size_t numApplied) {
+ if (ErrorCodes::CallbackCanceled == cbData.status) {
+ return;
+ }
+
invariant(cbData.status.isOK());
UniqueLock lk(_mutex);
if (_initialSyncState) {
@@ -1293,21 +1220,31 @@ Status DataReplicator::_scheduleFetch_inlock() {
}
}
- const auto startOptime = _opts.getMyLastOptime().getTimestamp();
+ const auto startOptime = _opts.getMyLastOptime();
+ // TODO: Read last applied hash from storage. See
+ // BackgroundSync::_readLastAppliedHash(OperationContex*).
+ long long startHash = 0LL;
const auto remoteOplogNS = _opts.remoteOplogNS;
- // TODO: add query options await_data, oplog_replay
- _fetcher.reset(new OplogFetcher(_exec,
- startOptime,
- _syncSource,
- remoteOplogNS,
- stdx::bind(&DataReplicator::_onOplogFetchFinish,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2)));
+ _fetcher = stdx::make_unique<OplogFetcher>(_exec,
+ OpTimeWithHash(startHash, startOptime),
+ _syncSource,
+ remoteOplogNS,
+ _opts.getReplSetConfig(),
+ _dataReplicatorExternalState.get(),
+ stdx::bind(&DataReplicator::_enqueueDocuments,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3,
+ stdx::placeholders::_4),
+ stdx::bind(&DataReplicator::_onOplogFetchFinish,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2));
}
if (!_fetcher->isActive()) {
- Status status = _fetcher->schedule();
+ Status status = _fetcher->startup();
if (!status.isOK()) {
return status;
}
@@ -1335,6 +1272,7 @@ Status DataReplicator::scheduleShutdown() {
invariant(!_onShutdown.isValid());
_onShutdown = eventStatus.getValue();
_cancelAllHandles_inlock();
+ _oplogBuffer.clear();
}
// Schedule _doNextActions in case nothing is active to trigger the _onShutdown event.
@@ -1369,38 +1307,45 @@ Status DataReplicator::_shutdown() {
return status;
}
-void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse>& fetchResult,
- Fetcher::NextAction* nextAction) {
- const Status status = fetchResult.getStatus();
- if (status.code() == ErrorCodes::CallbackCanceled)
+void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info,
+ Milliseconds getMoreElapsed) {
+ if (info.toApplyDocumentCount == 0) {
return;
- if (status.isOK()) {
- const auto& docs = fetchResult.getValue().documents;
- if (docs.begin() != docs.end()) {
- LockGuard lk(_mutex);
- std::for_each(
- docs.cbegin(), docs.cend(), [&](const BSONObj& doc) { _oplogBuffer.push(doc); });
- auto doc = docs.rbegin();
- BSONElement tsElem(doc->getField("ts"));
- while (tsElem.eoo() && doc != docs.rend()) {
- tsElem = (doc++)->getField("ts");
- }
+ }
- if (!tsElem.eoo()) {
- _lastTimestampFetched = tsElem.timestamp();
- } else {
- warning() << "Did not find a 'ts' timestamp field in any of the fetched documents";
- }
- }
- if (*nextAction == Fetcher::NextAction::kNoAction) {
- // TODO: create new fetcher?, with new query from where we left off -- d'tor fetcher
- }
+ // Wait for enough space.
+ // Gets unblocked on shutdown.
+ _oplogBuffer.waitForSpace(info.toApplyDocumentBytes);
+
+ OCCASIONALLY {
+ LOG(2) << "bgsync buffer has " << _oplogBuffer.size() << " bytes";
}
- if (!status.isOK()) {
+ // Buffer docs for later application.
+ _oplogBuffer.pushAllNonBlocking(begin, end);
+
+ _lastTimestampFetched = info.lastDocument.opTime.getTimestamp();
+
+ // TODO: updates metrics with "info" and "getMoreElapsed".
+
+ _doNextActions();
+}
+
+void DataReplicator::_onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched) {
+ if (status.code() == ErrorCodes::CallbackCanceled) {
+ return;
+ } else if (status.isOK()) {
+ _lastTimestampFetched = lastFetched.opTime.getTimestamp();
+
+ // TODO: create new fetcher?, with new query from where we left off -- d'tor fetcher
+ } else {
+ invariant(!status.isOK());
// Got an error, now decide what to do...
switch (status.code()) {
- case ErrorCodes::OplogStartMissing: {
+ case ErrorCodes::OplogStartMissing:
+ case ErrorCodes::RemoteOplogStale: {
_setState(DataReplicatorState::Rollback);
// possible rollback
auto scheduleResult = _exec->scheduleDBWork(
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index 53dda5f96c5..6382cf9f9a4 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -40,6 +40,8 @@
#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
+#include "mongo/db/repl/data_replicator_external_state.h"
+#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/reporter.h"
@@ -70,7 +72,6 @@ using Response = executor::RemoteCommandResponse;
using TimestampStatus = StatusWith<Timestamp>;
using UniqueLock = stdx::unique_lock<stdx::mutex>;
-class OplogFetcher;
struct InitialSyncState;
struct MemberState;
class ReplicationProgressManager;
@@ -113,6 +114,9 @@ struct DataReplicatorOptions {
/** Function to get this node's slaveDelay. */
using GetSlaveDelayFn = stdx::function<Seconds()>;
+ /** Function to get current replica set configuration */
+ using GetReplSetConfigFn = stdx::function<ReplicaSetConfig()>;
+
// Error and retry values
Milliseconds syncSourceRetryWait{1000};
Milliseconds initialSyncRetryWait{1000};
@@ -140,6 +144,8 @@ struct DataReplicatorOptions {
SetMyLastOptimeFn setMyLastOptime;
SetFollowerModeFn setFollowerMode;
GetSlaveDelayFn getSlaveDelay;
+ GetReplSetConfigFn getReplSetConfig;
+
SyncSourceSelector* syncSourceSelector = nullptr;
std::string toString() const {
@@ -158,7 +164,9 @@ struct DataReplicatorOptions {
*/
class DataReplicator {
public:
- DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* exec);
+ DataReplicator(DataReplicatorOptions opts,
+ std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState,
+ ReplicationExecutor* exec);
virtual ~DataReplicator();
@@ -226,8 +234,16 @@ private:
// Only executed via executor
void _resumeFinish(CallbackArgs cbData);
- void _onOplogFetchFinish(const QueryResponseStatus& fetchResult,
- Fetcher::NextAction* nextAction);
+
+ /**
+ * Pushes documents from oplog fetcher to blocking queue for
+ * applier to consume.
+ */
+ void _enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info,
+ Milliseconds elapsed);
+ void _onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched);
void _rollbackOperations(const CallbackArgs& cbData);
void _doNextActions();
void _doNextActions_InitialSync_inlock();
@@ -273,6 +289,7 @@ private:
// Set during construction
const DataReplicatorOptions _opts;
+ std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState;
ReplicationExecutor* _exec;
//
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
new file mode 100644
index 00000000000..983290a2148
--- /dev/null
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/optime_with.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace repl {
+
+/**
+ * Holds current term and last committed optime necessary to populate find/getMore command requests.
+ */
+using OpTimeWithTerm = OpTimeWith<long long>;
+
+/**
+ * This class represents the interface the DataReplicator uses to interact with the
+ * rest of the system. All functionality of the DataReplicator that would introduce
+ * dependencies on large sections of the server code and thus break the unit testability of
+ * DataReplicator should be moved here.
+ */
+class DataReplicatorExternalState {
+ MONGO_DISALLOW_COPYING(DataReplicatorExternalState);
+
+public:
+ DataReplicatorExternalState() = default;
+
+ virtual ~DataReplicatorExternalState() = default;
+
+ /**
+ * Returns the current term and last committed optime.
+ * Returns (OpTime::kUninitializedTerm, OpTime()) if not available.
+ */
+ virtual OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() = 0;
+
+ /**
+ * Forwards the parsed metadata in the query results to the replication system.
+ */
+ virtual void processMetadata(const rpc::ReplSetMetadata& metadata) = 0;
+
+ /**
+ * Evaluates quality of sync source. Accepts the current sync source; the last optime on this
+ * sync source (from metadata); and whether this sync source has a sync source (also from
+ * metadata).
+ */
+ virtual bool shouldStopFetching(const HostAndPort& source,
+ const OpTime& sourceOpTime,
+ bool sourceHasSyncSource) = 0;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
new file mode 100644
index 00000000000..828a2aa51a5
--- /dev/null
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/data_replicator_external_state_impl.h"
+
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace repl {
+
+DataReplicatorExternalStateImpl::DataReplicatorExternalStateImpl(
+ ReplicationCoordinator* replicationCoordinator)
+ : _replicationCoordinator(replicationCoordinator) {}
+
+OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOpTime() {
+ if (!_replicationCoordinator->isV1ElectionProtocol()) {
+ return {OpTime::kUninitializedTerm, OpTime()};
+ }
+ return {_replicationCoordinator->getTerm(), _replicationCoordinator->getLastCommittedOpTime()};
+}
+
+void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& metadata) {
+ _replicationCoordinator->processReplSetMetadata(metadata);
+ if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
+ _replicationCoordinator->cancelAndRescheduleElectionTimeout();
+ }
+}
+
+bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source,
+ const OpTime& sourceOpTime,
+ bool sourceHasSyncSource) {
+ // Re-evaluate quality of sync target.
+ if (_replicationCoordinator->shouldChangeSyncSource(
+ source, sourceOpTime, sourceHasSyncSource)) {
+ LOG(1) << "Canceling oplog query because we have to choose a sync source. Current source: "
+ << source << ", OpTime " << sourceOpTime
+ << ", hasSyncSource:" << sourceHasSyncSource;
+ return true;
+ }
+ return false;
+}
+
+ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const {
+ return _replicationCoordinator;
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
new file mode 100644
index 00000000000..8fc84ff218c
--- /dev/null
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -0,0 +1,64 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/data_replicator_external_state.h"
+
+namespace mongo {
+namespace repl {
+
+class ReplicationCoordinator;
+
+/**
+ * Data replicator external state implementation using a replication coordinator.
+ */
+
+class DataReplicatorExternalStateImpl : public DataReplicatorExternalState {
+public:
+ DataReplicatorExternalStateImpl(ReplicationCoordinator* replicationCoordinator);
+
+ OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
+
+ void processMetadata(const rpc::ReplSetMetadata& metadata) override;
+
+ bool shouldStopFetching(const HostAndPort& source,
+ const OpTime& sourceOpTime,
+ bool sourceHasSyncSource) override;
+
+protected:
+ ReplicationCoordinator* getReplicationCoordinator() const;
+
+private:
+ // Not owned by us.
+ ReplicationCoordinator* _replicationCoordinator;
+};
+
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
new file mode 100644
index 00000000000..501d9ae70c3
--- /dev/null
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/data_replicator_external_state_mock.h"
+
+namespace mongo {
+namespace repl {
+
+OpTimeWithTerm DataReplicatorExternalStateMock::getCurrentTermAndLastCommittedOpTime() {
+ return {currentTerm, lastCommittedOpTime};
+}
+
+void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata& metadata) {
+ metadataProcessed = metadata;
+}
+
+bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source,
+ const OpTime& sourceOpTime,
+ bool sourceHasSyncSource) {
+ lastSyncSourceChecked = source;
+ syncSourceLastOpTime = sourceOpTime;
+ syncSourceHasSyncSource = sourceHasSyncSource;
+ return shouldStopFetchingResult;
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
new file mode 100644
index 00000000000..ef78c691157
--- /dev/null
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -0,0 +1,70 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/data_replicator_external_state.h"
+
+namespace mongo {
+namespace repl {
+
+class ReplicationCoordinator;
+
+/**
+ * Data replicator external state implementation for testing.
+ */
+
+class DataReplicatorExternalStateMock : public DataReplicatorExternalState {
+public:
+ OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
+
+ void processMetadata(const rpc::ReplSetMetadata& metadata) override;
+
+ bool shouldStopFetching(const HostAndPort& source,
+ const OpTime& sourceOpTime,
+ bool sourceHasSyncSource) override;
+
+ // Returned by getCurrentTermAndLastCommittedOpTime.
+ long long currentTerm = OpTime::kUninitializedTerm;
+ OpTime lastCommittedOpTime;
+
+ // Set by processMetadata.
+ rpc::ReplSetMetadata metadataProcessed;
+
+ // Set by shouldStopFetching.
+ HostAndPort lastSyncSourceChecked;
+ OpTime syncSourceLastOpTime;
+ bool syncSourceHasSyncSource = false;
+
+ // Returned by shouldStopFetching.
+ bool shouldStopFetchingResult = false;
+};
+
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 0373b14dfea..6aa10156dba 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/json.h"
#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/data_replicator.h"
+#include "mongo/db/repl/data_replicator_external_state_mock.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/update_position_args.h"
@@ -183,6 +184,9 @@ protected:
reset();
launchExecutorThread();
+
+ _myLastOpTime = OpTime({3, 0}, 1);
+
DataReplicatorOptions options;
options.initialSyncRetryWait = Milliseconds(0);
options.applierFn = [this](const MultiApplier::Operations& ops) { return _applierFn(ops); };
@@ -207,8 +211,25 @@ protected:
};
options.getSlaveDelay = [this]() { return Seconds(0); };
options.syncSourceSelector = this;
+ options.getReplSetConfig = []() {
+ ReplicaSetConfig config;
+ ASSERT_OK(
+ config.initialize(BSON("_id"
+ << "myset"
+ << "version" << 1 << "protocolVersion" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "localhost:12345")) << "settings"
+ << BSON("electionTimeoutMillis" << 10000))));
+ return config;
+ };
+
+ auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
+ dataReplicatorExternalState->currentTerm = 1LL;
+ dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime;
+
try {
- _dr.reset(new DataReplicator(options, &(getReplExecutor())));
+ _dr.reset(new DataReplicator(
+ options, std::move(dataReplicatorExternalState), &(getReplExecutor())));
} catch (...) {
ASSERT_OK(exceptionToStatus());
}
@@ -325,7 +346,7 @@ protected:
const int expectedResponses(_responses.size());
// counter for oplog entries
- int c(0);
+ int c(1);
while (true) {
net->enterNetwork();
if (!net->hasReadyRequests() && processedRequests < expectedResponses) {
@@ -871,10 +892,16 @@ TEST_F(SteadyStateTest, RollbackTwoSyncSourcesSecondRollbackSucceeds) {
}
TEST_F(SteadyStateTest, PauseDataReplicator) {
+ auto lastOperationApplied = BSON("op"
+ << "a"
+ << "v" << OplogEntry::kOplogVersion << "ts"
+ << Timestamp(Seconds(123), 0));
+
auto operationToApply = BSON("op"
<< "a"
<< "v" << OplogEntry::kOplogVersion << "ts"
- << Timestamp(Seconds(123), 0));
+ << Timestamp(Seconds(456), 0));
+
stdx::mutex mutex;
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
@@ -896,7 +923,7 @@ TEST_F(SteadyStateTest, PauseDataReplicator) {
};
auto& dr = getDR();
- _myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kInitialTerm);
+ _myLastOpTime = OpTime(lastOperationApplied["ts"].timestamp(), OpTime::kInitialTerm);
_memberState = MemberState::RS_SECONDARY;
auto net = getNet();
@@ -907,10 +934,12 @@ TEST_F(SteadyStateTest, PauseDataReplicator) {
ASSERT_TRUE(net->hasReadyRequests());
{
auto networkRequest = net->getNextReadyRequest();
- auto commandResponse = BSON(
- "ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
- << "local.oplog.rs"
- << "firstBatch" << BSON_ARRAY(operationToApply)));
+ auto commandResponse =
+ BSON("ok" << 1 << "cursor"
+ << BSON("id" << 1LL << "ns"
+ << "local.oplog.rs"
+ << "firstBatch"
+ << BSON_ARRAY(lastOperationApplied << operationToApply)));
scheduleNetworkResponse(networkRequest, commandResponse);
}
@@ -955,10 +984,16 @@ TEST_F(SteadyStateTest, PauseDataReplicator) {
}
TEST_F(SteadyStateTest, ApplyOneOperation) {
+ auto lastOperationApplied = BSON("op"
+ << "a"
+ << "v" << OplogEntry::kOplogVersion << "ts"
+ << Timestamp(Seconds(123), 0));
+
auto operationToApply = BSON("op"
<< "a"
<< "v" << OplogEntry::kOplogVersion << "ts"
- << Timestamp(Seconds(123), 0));
+ << Timestamp(Seconds(456), 0));
+
stdx::mutex mutex;
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
@@ -979,7 +1014,7 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
barrier.countDownAndWait();
};
- _myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kInitialTerm);
+ _myLastOpTime = OpTime(lastOperationApplied["ts"].timestamp(), OpTime::kInitialTerm);
_memberState = MemberState::RS_SECONDARY;
auto net = getNet();
@@ -991,10 +1026,12 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
ASSERT_TRUE(net->hasReadyRequests());
{
auto networkRequest = net->getNextReadyRequest();
- auto commandResponse = BSON(
- "ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
- << "local.oplog.rs"
- << "firstBatch" << BSON_ARRAY(operationToApply)));
+ auto commandResponse =
+ BSON("ok" << 1 << "cursor"
+ << BSON("id" << 1LL << "ns"
+ << "local.oplog.rs"
+ << "firstBatch"
+ << BSON_ARRAY(lastOperationApplied << operationToApply)));
scheduleNetworkResponse(networkRequest, commandResponse);
}
ASSERT_EQUALS(0U, dr.getOplogBufferCount());
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
new file mode 100644
index 00000000000..fcb41bdca51
--- /dev/null
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -0,0 +1,384 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/oplog_fetcher.h"
+
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace repl {
+
+Seconds OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout(2);
+
+namespace {
+
+/**
+ * Calculates await data timeout based on the current replica set configuration.
+ */
+Milliseconds calculateAwaitDataTimeout(const ReplicaSetConfig& config) {
+ // Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election
+ // timeout. This enables the sync source to communicate liveness of the primary to secondaries.
+ // Under protocol version 0, use a default timeout of 2 seconds for awaitData.
+ return config.getProtocolVersion() == 1LL ? config.getElectionTimeoutPeriod() / 2
+ : OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout;
+}
+
+/**
+ * Returns find command object suitable for tailing remote oplog.
+ */
+BSONObj makeFindCommandObject(DataReplicatorExternalState* dataReplicatorExternalState,
+ const NamespaceString& nss,
+ OpTime lastOpTimeFetched) {
+ invariant(dataReplicatorExternalState);
+ BSONObjBuilder cmdBob;
+ cmdBob.append("find", nss.coll());
+ cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
+ cmdBob.append("tailable", true);
+ cmdBob.append("oplogReplay", true);
+ cmdBob.append("awaitData", true);
+ cmdBob.append("maxTimeMS", durationCount<Milliseconds>(Minutes(1))); // 1 min initial find.
+ auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
+ if (opTimeWithTerm.value != OpTime::kUninitializedTerm) {
+ cmdBob.append("term", opTimeWithTerm.value);
+ }
+ return cmdBob.obj();
+}
+
+/**
+ * Returns getMore command object suitable for tailing remote oplog.
+ */
+BSONObj makeGetMoreCommandObject(DataReplicatorExternalState* dataReplicatorExternalState,
+ const NamespaceString& nss,
+ CursorId cursorId,
+ Milliseconds fetcherMaxTimeMS) {
+ BSONObjBuilder cmdBob;
+ cmdBob.append("getMore", cursorId);
+ cmdBob.append("collection", nss.coll());
+ cmdBob.append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS));
+ auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
+ if (opTimeWithTerm.value != OpTime::kUninitializedTerm) {
+ cmdBob.append("term", opTimeWithTerm.value);
+ opTimeWithTerm.opTime.append(&cmdBob, "lastKnownCommittedOpTime");
+ }
+ return cmdBob.obj();
+}
+
+/**
+ * Returns command metadata object suitable for tailing remote oplog.
+ */
+StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) {
+ return isV1ElectionProtocol ? BSON(rpc::kReplSetMetadataFieldName << 1)
+ : rpc::makeEmptyMetadata();
+}
+
+/**
+ * Checks the first batch of results from query.
+ * 'documents' are the first batch of results returned from tailing the remote oplog.
+ * 'lastFetched' optime and hash should be consistent with the predicate in the query.
+ * Returns RemoteOplogStale if the oplog query has no results.
+ * Returns OplogStartMissing if we cannot find the optime of the last fetched operation in
+ * the remote oplog.
+ */
+Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched) {
+ if (documents.empty()) {
+ // The GTE query from upstream returns nothing, so we're ahead of the upstream.
+ return Status(ErrorCodes::RemoteOplogStale,
+ str::stream() << "We are ahead of the sync source. Our last op time fetched: "
+ << lastFetched.opTime.toString());
+ }
+ const auto& o = documents.front();
+ auto opTimeResult = OpTime::parseFromOplogEntry(o);
+ if (!opTimeResult.isOK()) {
+ return Status(ErrorCodes::OplogStartMissing,
+ str::stream() << "our last op time fetched: " << lastFetched.opTime.toString()
+ << " (hash: " << lastFetched.value << ")"
+ << ". failed to parse optime from first oplog on source: "
+ << o.toString() << ": " << opTimeResult.getStatus().toString());
+ }
+ auto opTime = opTimeResult.getValue();
+ long long hash = o["h"].numberLong();
+ if (opTime != lastFetched.opTime || hash != lastFetched.value) {
+ return Status(ErrorCodes::OplogStartMissing,
+ str::stream() << "our last op time fetched: " << lastFetched.opTime.toString()
+ << ". source's GTE: " << opTime.toString() << " hashes: ("
+ << lastFetched.value << "/" << hash << ")");
+ }
+ return Status::OK();
+}
+
+} // namespace
+
+StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
+ const Fetcher::Documents& documents, bool first, Timestamp lastTS) {
+ if (first && documents.empty()) {
+ return Status(ErrorCodes::OplogStartMissing,
+ str::stream() << "The first batch of oplog entries is empty, but expected at "
+ "least 1 document matching ts: " << lastTS.toString());
+ }
+
+ DocumentsInfo info;
+ // The count of the bytes of the documents read off the network.
+ info.networkDocumentBytes = 0;
+ info.networkDocumentCount = 0;
+ for (auto&& doc : documents) {
+ info.networkDocumentBytes += doc.objsize();
+ ++info.networkDocumentCount;
+
+ // If this is the first response (to the $gte query) then we already applied the first doc.
+ if (first && info.networkDocumentCount == 1U) {
+ continue;
+ }
+
+ // Check to see if the oplog entry goes back in time for this document.
+ const auto docOpTime = OpTime::parseFromOplogEntry(doc);
+ // entries must have a "ts" field.
+ if (!docOpTime.isOK()) {
+ return docOpTime.getStatus();
+ }
+
+ info.lastDocument = {doc["h"].numberLong(), docOpTime.getValue()};
+
+ const auto docTS = info.lastDocument.opTime.getTimestamp();
+ if (lastTS >= docTS) {
+ return Status(ErrorCodes::OplogOutOfOrder,
+ str::stream() << "Out of order entries in oplog. lastTS: "
+ << lastTS.toString() << " outOfOrderTS:" << docTS.toString()
+ << " at count:" << info.networkDocumentCount);
+ }
+ lastTS = docTS;
+ }
+
+ // These numbers are for the documents we will apply.
+ info.toApplyDocumentCount = documents.size();
+ info.toApplyDocumentBytes = info.networkDocumentBytes;
+ if (first) {
+ // The count is one less since the first document found was already applied ($gte $ts query)
+ // and we will not apply it again.
+ --info.toApplyDocumentCount;
+ auto alreadyAppliedDocument = documents.cbegin();
+ info.toApplyDocumentBytes -= alreadyAppliedDocument->objsize();
+ }
+ return info;
+}
+
+OplogFetcher::OplogFetcher(executor::TaskExecutor* exec,
+ OpTimeWithHash lastFetched,
+ HostAndPort source,
+ NamespaceString oplogNSS,
+ ReplicaSetConfig config,
+ DataReplicatorExternalState* dataReplicatorExternalState,
+ EnqueueDocumentsFn enqueueDocumentsFn,
+ OnShutdownCallbackFn onShutdownCallbackFn)
+ : _dataReplicatorExternalState(dataReplicatorExternalState),
+ _fetcher(exec,
+ source,
+ oplogNSS.db().toString(),
+ makeFindCommandObject(dataReplicatorExternalState, oplogNSS, lastFetched.opTime),
+ stdx::bind(
+ &OplogFetcher::_callback, this, stdx::placeholders::_1, stdx::placeholders::_3),
+ uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL)),
+ config.getElectionTimeoutPeriod()),
+ _enqueueDocumentsFn(enqueueDocumentsFn),
+ _awaitDataTimeout(calculateAwaitDataTimeout(config)),
+ _onShutdownCallbackFn(onShutdownCallbackFn),
+ _lastFetched(lastFetched) {
+ uassert(ErrorCodes::BadValue, "null last optime fetched", !lastFetched.opTime.isNull());
+ uassert(ErrorCodes::InvalidReplicaSetConfig,
+ "uninitialized replica set configuration",
+ config.isInitialized());
+ uassert(ErrorCodes::BadValue, "null enqueueDocuments function", enqueueDocumentsFn);
+ uassert(ErrorCodes::BadValue, "null onShutdownCallback function", onShutdownCallbackFn);
+}
+
+std::string OplogFetcher::toString() const {
+ return str::stream() << "OplogReader -"
+ << " last optime fetched: " << _lastFetched.opTime.toString()
+ << " last hash fetched: " << _lastFetched.value
+ << " fetcher: " << _fetcher.getDiagnosticString();
+}
+
+bool OplogFetcher::isActive() const {
+ return _fetcher.isActive();
+}
+
+Status OplogFetcher::startup() {
+ return _fetcher.schedule();
+}
+
+void OplogFetcher::shutdown() {
+ _fetcher.cancel();
+}
+
+void OplogFetcher::join() {
+ _fetcher.wait();
+}
+
+OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _lastFetched;
+}
+
+BSONObj OplogFetcher::getCommandObject_forTest() const {
+ return _fetcher.getCommandObject();
+}
+
+BSONObj OplogFetcher::getMetadataObject_forTest() const {
+ return _fetcher.getMetadataObject();
+}
+
+Milliseconds OplogFetcher::getRemoteCommandTimeout_forTest() const {
+ return _fetcher.getTimeout();
+}
+
+Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const {
+ return _awaitDataTimeout;
+}
+
+void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
+ BSONObjBuilder* getMoreBob) {
+ // if target cut connections between connecting and querying (for
+ // example, because it stepped down) we might not have a cursor
+ if (!result.isOK()) {
+ LOG(2) << "Error returned from oplog query: " << result.getStatus();
+ _onShutdown(result.getStatus());
+ return;
+ }
+
+ const auto& queryResponse = result.getValue();
+ OpTime sourcesLastOpTime;
+ bool syncSourceHasSyncSource = false;
+
+ // Forward metadata (containing liveness information) to data replicator external state.
+ bool receivedMetadata =
+ queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
+ if (receivedMetadata) {
+ const auto& metadataObj = queryResponse.otherFields.metadata;
+ auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj);
+ if (!metadataResult.isOK()) {
+ error() << "invalid replication metadata from sync source " << _fetcher.getSource()
+ << ": " << metadataResult.getStatus() << ": " << metadataObj;
+ _onShutdown(metadataResult.getStatus());
+ return;
+ }
+ auto metadata = metadataResult.getValue();
+ _dataReplicatorExternalState->processMetadata(metadata);
+ sourcesLastOpTime = metadata.getLastOpVisible();
+ syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
+ }
+
+ const auto& documents = queryResponse.documents;
+ auto firstDocToApply = documents.cbegin();
+
+ if (!documents.empty()) {
+ LOG(2) << "oplog fetcher read " << documents.size()
+ << " operations from remote oplog starting at " << documents.front()["ts"]
+ << " and ending at " << documents.back()["ts"];
+ } else {
+ LOG(2) << "oplog fetcher read 0 operations from remote oplog";
+ }
+
+ auto opTimeWithHash = getLastOpTimeWithHashFetched();
+
+ // Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
+ if (queryResponse.first) {
+ auto status = checkRemoteOplogStart(documents, opTimeWithHash);
+ if (!status.isOK()) {
+ // Stop oplog fetcher and execute rollback.
+ _onShutdown(status, opTimeWithHash);
+ return;
+ }
+
+ // If this is the first batch and no rollback is needed, skip the first document.
+ firstDocToApply++;
+ }
+
+ auto validateResult = OplogFetcher::validateDocuments(
+ documents, queryResponse.first, opTimeWithHash.opTime.getTimestamp());
+ if (!validateResult.isOK()) {
+ _onShutdown(validateResult.getStatus(), opTimeWithHash);
+ return;
+ }
+ auto info = validateResult.getValue();
+
+ // TODO: back pressure handling will be added in SERVER-23499.
+ _enqueueDocumentsFn(firstDocToApply, documents.cend(), info, queryResponse.elapsedMillis);
+
+ // Update last fetched info.
+ if (firstDocToApply != documents.cend()) {
+ opTimeWithHash = info.lastDocument;
+ LOG(3) << "batch resetting last fetched optime: " << opTimeWithHash.opTime
+ << "; hash: " << opTimeWithHash.value;
+
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _lastFetched = opTimeWithHash;
+ }
+
+ if (_dataReplicatorExternalState->shouldStopFetching(
+ _fetcher.getSource(), sourcesLastOpTime, syncSourceHasSyncSource)) {
+ _onShutdown(Status(ErrorCodes::InvalidSyncSource,
+ str::stream() << "sync source " << _fetcher.getSource().toString()
+ << " (last optime: " << sourcesLastOpTime.toString()
+ << "; has sync source: " << syncSourceHasSyncSource
+ << ") is no longer valid"),
+ opTimeWithHash);
+ return;
+ }
+
+ // No more data. Stop processing and return Status::OK along with last
+ // fetch info.
+ if (!getMoreBob) {
+ _onShutdown(Status::OK(), opTimeWithHash);
+ return;
+ }
+
+ getMoreBob->appendElements(makeGetMoreCommandObject(_dataReplicatorExternalState,
+ queryResponse.nss,
+ queryResponse.cursorId,
+ _awaitDataTimeout));
+}
+
+void OplogFetcher::_onShutdown(Status status) {
+ _onShutdown(status, getLastOpTimeWithHashFetched());
+}
+
+void OplogFetcher::_onShutdown(Status status, OpTimeWithHash opTimeWithHash) {
+ _onShutdownCallbackFn(status, opTimeWithHash);
+}
+
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
new file mode 100644
index 00000000000..c6209461161
--- /dev/null
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -0,0 +1,221 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/client/query_fetcher.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/data_replicator_external_state.h"
+#include "mongo/db/repl/optime_with.h"
+#include "mongo/db/repl/replica_set_config.h"
+#include "mongo/stdx/functional.h"
+
+namespace mongo {
+namespace repl {
+
+/**
+ * Used to keep track of the optime and hash of the last fetched operation.
+ */
+using OpTimeWithHash = OpTimeWith<long long>;
+
+/**
+ * The oplog fetcher, once started, reads operations from a remote oplog using a tailable cursor.
+ *
+ * The initial find command is generated from last fetched optime and hash and may contain the
+ * current term depending on the replica set config provided.
+ *
+ * Forwards metadata in each find/getMore response to the data replicator external state.
+ *
+ * Performs additional validation on first batch of operations returned from the query to ensure we
+ * are able to continue from our last known fetched operation.
+ *
+ * Validates each batch of operations.
+ *
+ * Pushes operations from each batch of operations onto a buffer using the "enqueueDocumentsFn"
+ * function.
+ *
+ * Issues a getMore command after successfully processing each batch of operations.
+ *
+ * When there is an error or when it is not possible to issue another getMore request, calls
+ * "onShutdownCallbackFn" to signal the end of processing.
+ */
+class OplogFetcher {
+ MONGO_DISALLOW_COPYING(OplogFetcher);
+
+public:
+ static Seconds kDefaultProtocolZeroAwaitDataTimeout;
+
+ /**
+ * Type of function called by the oplog fetcher on shutdown with
+ * the final oplog fetcher status, last optime fetched and last hash fetched.
+ *
+ * The status will be Status::OK() if we have processed the last batch of operations
+ * from the tailable cursor ("bob" is null in the fetcher callback).
+ */
+ using OnShutdownCallbackFn =
+ stdx::function<void(const Status& shutdownStatus, const OpTimeWithHash& lastFetched)>;
+
+ /**
+ * Statistics on current batch of operations returned by the fetcher.
+ */
+ struct DocumentsInfo {
+ size_t networkDocumentCount = 0;
+ size_t networkDocumentBytes = 0;
+ size_t toApplyDocumentCount = 0;
+ size_t toApplyDocumentBytes = 0;
+ OpTimeWithHash lastDocument = {0, OpTime()};
+ };
+
+ /**
+ * Type of function that accepts a pair of iterators into a range of operations
+ * within the current batch of results and copies the operations into
+ * a buffer to be consumed by the next stage of the replication process.
+ *
+ * Additional information on the operations is provided in a DocumentsInfo
+ * struct and duration for how long the last remote command took to complete.
+ */
+ using EnqueueDocumentsFn = stdx::function<void(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const DocumentsInfo& info,
+ Milliseconds remoteCommandProcessingTime)>;
+
+ /**
+ * Validates documents in current batch of results returned from tailing the remote oplog.
+ * 'first' should be set to true if this set of documents is the first batch returned from the
+ * query.
+ * On success, returns statistics on operations.
+ */
+ static StatusWith<DocumentsInfo> validateDocuments(const Fetcher::Documents& documents,
+ bool first,
+ Timestamp lastTS);
+
+ /**
+ * Initializes fetcher with command to tail remote oplog.
+ *
+ * Throws a UserException if validation fails on any of the provided arguments.
+ */
+ OplogFetcher(executor::TaskExecutor* exec,
+ OpTimeWithHash lastFetched,
+ HostAndPort source,
+ NamespaceString nss,
+ ReplicaSetConfig config,
+ DataReplicatorExternalState* dataReplicatorExternalState,
+ EnqueueDocumentsFn enqueueDocumentsFn,
+ OnShutdownCallbackFn onShutdownCallbackFn);
+
+ virtual ~OplogFetcher() = default;
+
+ std::string toString() const;
+
+ /**
+ * Returns true if we have scheduled the fetcher to read the oplog on the sync source.
+ */
+ bool isActive() const;
+
+ /**
+ * Starts fetcher so that we begin tailing the remote oplog on the sync source.
+ */
+ Status startup();
+
+ /**
+ * Cancels both scheduled and active remote command requests.
+ * Returns immediately if the Oplog Fetcher is not active.
+ * It is fine to call this multiple times.
+ */
+ void shutdown();
+
+ /**
+ * Waits until the oplog fetcher is inactive.
+ * It is fine to call this multiple times.
+ */
+ void join();
+
+ /**
+ * Returns optime and hash of the last oplog entry in the most recent oplog query result.
+ */
+ OpTimeWithHash getLastOpTimeWithHashFetched() const;
+
+ // ================== Test support API ===================
+
+ /**
+ * Returns command object sent in first remote command.
+ */
+ BSONObj getCommandObject_forTest() const;
+
+ /**
+ * Returns metadata object sent in remote commands.
+ */
+ BSONObj getMetadataObject_forTest() const;
+
+ /**
+ * Returns timeout for remote commands to complete.
+ */
+ Milliseconds getRemoteCommandTimeout_forTest() const;
+
+ /**
+ * Returns the await data timeout used for the "maxTimeMS" field in getMore command requests.
+ */
+ Milliseconds getAwaitDataTimeout_forTest() const;
+
+private:
+ /**
+ * Processes each batch of results from the tailable cursor started by the fetcher on the sync
+ * source.
+ *
+ * Calls "onShutdownCallbackFn" if there is an error or if there are no further results to
+ * request from the sync source.
+ */
+ void _callback(const Fetcher::QueryResponseStatus& result, BSONObjBuilder* getMoreBob);
+
+ /**
+ * Notifies caller that the oplog fetcher has completed processing operations from
+ * the remote oplog.
+ */
+ void _onShutdown(Status status);
+ void _onShutdown(Status status, OpTimeWithHash opTimeWithHash);
+
+ DataReplicatorExternalState* _dataReplicatorExternalState;
+ Fetcher _fetcher;
+ const EnqueueDocumentsFn _enqueueDocumentsFn;
+ const Milliseconds _awaitDataTimeout;
+ const OnShutdownCallbackFn _onShutdownCallbackFn;
+
+ // Protects member data of this Fetcher.
+ mutable stdx::mutex _mutex;
+
+ // Used to validate start of first batch of results from the remote oplog
+ // tailing query and to keep track of the last known operation consumed via
+ // "_enqueueDocumentsFn".
+ OpTimeWithHash _lastFetched;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
new file mode 100644
index 00000000000..378f212054a
--- /dev/null
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -0,0 +1,875 @@
+/**
+ * Copyright 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/repl/data_replicator_external_state_mock.h"
+#include "mongo/db/repl/oplog_fetcher.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/rpc/metadata.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/unittest.h"
+
+namespace {
+
+using namespace mongo;
+using namespace mongo::repl;
+
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+
+class ShutdownState {
+ MONGO_DISALLOW_COPYING(ShutdownState);
+
+public:
+ ShutdownState();
+
+ Status getStatus() const;
+ OpTimeWithHash getLastFetched() const;
+
+ /**
+ * Use this for oplog fetcher shutdown callback.
+ */
+ void operator()(const Status& status, const OpTimeWithHash& lastFetched);
+
+private:
+ Status _status = executor::TaskExecutorTest::getDetectableErrorStatus();
+ OpTimeWithHash _lastFetched = {0, OpTime()};
+};
+
+class OplogFetcherTest : public executor::ThreadPoolExecutorTest {
+protected:
+ void setUp() override;
+ void tearDown() override;
+
+ /**
+ * Schedules response to the current network request.
+ * Returns remote command request in network request.
+ */
+ RemoteCommandRequest scheduleNetworkResponse(RemoteCommandResponse response);
+
+ /**
+ * Schedules network response and instructs network interface to process response.
+ * Returns remote command request in network request.
+ */
+ RemoteCommandRequest processNetworkResponse(RemoteCommandResponse response,
+ bool expectReadyRequestsAfterProcessing = false);
+ RemoteCommandRequest processNetworkResponse(BSONObj obj,
+ bool expectReadyRequestsAfterProcessing = false);
+
+ /**
+ * Starts an oplog fetcher. Processes a single batch of results from
+ * the oplog query and shuts down.
+ * Returns shutdown state.
+ */
+ std::unique_ptr<ShutdownState> processSingleBatch(RemoteCommandResponse response);
+ std::unique_ptr<ShutdownState> processSingleBatch(BSONObj obj);
+
+ /**
+ * Tests checkSyncSource result handling.
+ */
+ void testSyncSourceChecking(rpc::ReplSetMetadata* metadata);
+
+ /**
+ * Tests handling of two batches of operations returned from query.
+ * Returns getMore request.
+ */
+ RemoteCommandRequest testTwoBatchHandling(bool isV1ElectionProtocol);
+
+ OpTimeWithHash lastFetched;
+
+ std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState;
+
+ Fetcher::Documents lastEnqueuedDocuments;
+ OplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo;
+ Milliseconds lastEnqueuedElapsed;
+ OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn;
+};
+
+ShutdownState::ShutdownState() = default;
+
+Status ShutdownState::getStatus() const {
+ return _status;
+}
+
+OpTimeWithHash ShutdownState::getLastFetched() const {
+ return _lastFetched;
+}
+
+void ShutdownState::operator()(const Status& status, const OpTimeWithHash& lastFetched) {
+ _status = status;
+ _lastFetched = lastFetched;
+}
+
+void OplogFetcherTest::setUp() {
+ executor::ThreadPoolExecutorTest::setUp();
+ launchExecutorThread();
+
+ lastFetched = {456LL, {{123, 0}, 1}};
+
+ dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
+ dataReplicatorExternalState->currentTerm = lastFetched.opTime.getTerm();
+ dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.opTime.getTerm()};
+
+ enqueueDocumentsFn = [this](Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info,
+ Milliseconds elapsed) {
+ lastEnqueuedDocuments = {begin, end};
+ lastEnqueuedDocumentsInfo = info;
+ lastEnqueuedElapsed = elapsed;
+ };
+}
+
+void OplogFetcherTest::tearDown() {
+ executor::ThreadPoolExecutorTest::tearDown();
+}
+
+RemoteCommandRequest OplogFetcherTest::scheduleNetworkResponse(RemoteCommandResponse response) {
+ auto net = getNet();
+ ASSERT_TRUE(net->hasReadyRequests());
+ Milliseconds millis(0);
+ executor::TaskExecutor::ResponseStatus responseStatus(response);
+ auto noi = net->getNextReadyRequest();
+ net->scheduleResponse(noi, net->now(), responseStatus);
+ return noi->getRequest();
+}
+
+RemoteCommandRequest OplogFetcherTest::processNetworkResponse(
+ RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing) {
+ auto net = getNet();
+ net->enterNetwork();
+ auto request = scheduleNetworkResponse(response);
+ net->runReadyNetworkOperations();
+ ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests());
+ net->exitNetwork();
+ return request;
+}
+
+RemoteCommandRequest OplogFetcherTest::processNetworkResponse(
+ BSONObj obj, bool expectReadyRequestsAfterProcessing) {
+ auto net = getNet();
+ net->enterNetwork();
+ auto request = scheduleNetworkResponse({obj, rpc::makeEmptyMetadata(), Milliseconds(0)});
+ net->runReadyNetworkOperations();
+ ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests());
+ net->exitNetwork();
+ return request;
+}
+
+HostAndPort source("localhost:12345");
+NamespaceString nss("local.oplog.rs");
+
+ReplicaSetConfig _createConfig(bool isV1ElectionProtocol) {
+ BSONObjBuilder bob;
+ bob.append("_id", "myset");
+ bob.append("version", 1);
+ if (isV1ElectionProtocol) {
+ bob.append("protocolVersion", 1);
+ }
+ {
+ BSONArrayBuilder membersBob(bob.subarrayStart("members"));
+ BSONObjBuilder(membersBob.subobjStart())
+ .appendElements(BSON("_id" << 0 << "host" << source.toString()));
+ }
+ {
+ BSONObjBuilder settingsBob(bob.subobjStart("settings"));
+ settingsBob.append("electionTimeoutMillis", 10000);
+ }
+ auto configObj = bob.obj();
+
+ ReplicaSetConfig config;
+ ASSERT_OK(config.initialize(configObj));
+ return config;
+}
+
+std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(
+ RemoteCommandResponse response) {
+ auto shutdownState = stdx::make_unique<ShutdownState>();
+
+ OplogFetcher oplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ stdx::ref(*shutdownState));
+
+ ASSERT_FALSE(oplogFetcher.isActive());
+ ASSERT_OK(oplogFetcher.startup());
+ ASSERT_TRUE(oplogFetcher.isActive());
+
+ auto request = processNetworkResponse(response);
+
+ ASSERT_EQUALS(oplogFetcher.getCommandObject_forTest(), request.cmdObj);
+ ASSERT_EQUALS(oplogFetcher.getMetadataObject_forTest(), request.metadata);
+
+ oplogFetcher.shutdown();
+ oplogFetcher.join();
+
+ return std::move(shutdownState);
+}
+
+std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj) {
+ return processSingleBatch({obj, rpc::makeEmptyMetadata(), Milliseconds(0)});
+}
+
+TEST_F(OplogFetcherTest, InvalidConstruction) {
+ // Null start timestamp.
+ ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(),
+ OpTimeWithHash(),
+ source,
+ nss,
+ _createConfig(true),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {}),
+ UserException,
+ ErrorCodes::BadValue,
+ "null last optime fetched");
+
+ // Null EnqueueDocumentsFn.
+ ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ dataReplicatorExternalState.get(),
+ OplogFetcher::EnqueueDocumentsFn(),
+ [](Status, OpTimeWithHash) {}),
+ UserException,
+ ErrorCodes::BadValue,
+ "null enqueueDocuments function");
+
+ // Uninitialized replica set configuration.
+ ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ ReplicaSetConfig(),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {}),
+ UserException,
+ ErrorCodes::InvalidReplicaSetConfig,
+ "uninitialized replica set configuration");
+
+ // Null OnShutdownCallbackFn.
+ ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ OplogFetcher::OnShutdownCallbackFn()),
+ UserException,
+ ErrorCodes::BadValue,
+ "null onShutdownCallback function");
+}
+
+void _checkDefaultCommandObjectFields(BSONObj cmdObj) {
+ ASSERT_EQUALS(std::string("find"), cmdObj.firstElementFieldName());
+ ASSERT_TRUE(cmdObj.getBoolField("tailable"));
+ ASSERT_TRUE(cmdObj.getBoolField("oplogReplay"));
+ ASSERT_TRUE(cmdObj.getBoolField("awaitData"));
+ ASSERT_EQUALS(60000, cmdObj.getIntField("maxTimeMS"));
+}
+
+TEST_F(
+ OplogFetcherTest,
+ CommandObjectContainsTermAndStartTimestampIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) {
+ auto cmdObj = OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {}).getCommandObject_forTest();
+ ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type());
+ ASSERT_EQUALS(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())),
+ cmdObj["filter"].Obj());
+ ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, cmdObj["term"].numberLong());
+ _checkDefaultCommandObjectFields(cmdObj);
+}
+
+TEST_F(
+ OplogFetcherTest,
+ CommandObjectContainsDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) {
+ dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm;
+ auto cmdObj = OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {}).getCommandObject_forTest();
+ ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type());
+ ASSERT_EQUALS(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())),
+ cmdObj["filter"].Obj());
+ ASSERT_FALSE(cmdObj.hasField("term"));
+ _checkDefaultCommandObjectFields(cmdObj);
+}
+
+TEST_F(OplogFetcherTest, MetadataObjectContainsReplSetMetadataFieldUnderProtocolVersion1) {
+ auto metadataObj = OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {}).getMetadataObject_forTest();
+ ASSERT_EQUALS(1, metadataObj.nFields());
+ ASSERT_EQUALS(1, metadataObj[rpc::kReplSetMetadataFieldName].numberInt());
+}
+
+TEST_F(OplogFetcherTest, MetadataObjectIsEmptyUnderProtocolVersion0) {
+ auto metadataObj = OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(false),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {}).getMetadataObject_forTest();
+ ASSERT_EQUALS(BSONObj(), metadataObj);
+}
+
+TEST_F(OplogFetcherTest, RemoteCommandTimeoutShouldEqualElectionTimeout) {
+ auto config = _createConfig(true);
+ auto timeout = OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ config,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {}).getRemoteCommandTimeout_forTest();
+ ASSERT_EQUALS(config.getElectionTimeoutPeriod(), timeout);
+}
+
+TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProtocolVersion1) {
+ auto config = _createConfig(true);
+ auto timeout = OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ config,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {}).getAwaitDataTimeout_forTest();
+ ASSERT_EQUALS(config.getElectionTimeoutPeriod() / 2, timeout);
+}
+
+TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldBeAConstantUnderProtocolVersion0) {
+ auto timeout = OplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(false),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {}).getAwaitDataTimeout_forTest();
+ ASSERT_EQUALS(OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout, timeout);
+}
+
+TEST_F(OplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) {
+ getExecutor().shutdown();
+
+ OplogFetcher oplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {});
+
+ // Last optime and hash fetched should match values passed to constructor.
+ ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched());
+
+ ASSERT_FALSE(oplogFetcher.isActive());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher.startup());
+ ASSERT_FALSE(oplogFetcher.isActive());
+
+ // Last optime and hash fetched should not change.
+ ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched());
+}
+
+TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupStopsTheOplogFetcher) {
+ ShutdownState shutdownState;
+
+ OplogFetcher oplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ stdx::ref(shutdownState));
+
+ ASSERT_FALSE(oplogFetcher.isActive());
+ ASSERT_OK(oplogFetcher.startup());
+ ASSERT_TRUE(oplogFetcher.isActive());
+
+ getExecutor().shutdown();
+
+ oplogFetcher.join();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
+ ASSERT_EQUALS(lastFetched, shutdownState.getLastFetched());
+}
+
+BSONObj makeNoopOplogEntry(OpTimeWithHash opTimeWithHash) {
+ BSONObjBuilder bob;
+ bob.appendElements(opTimeWithHash.opTime.toBSON());
+ bob.append("h", opTimeWithHash.value);
+ bob.append("op", "c");
+ bob.append("ns", "test.t");
+ return bob.obj();
+}
+
+BSONObj makeNoopOplogEntry(OpTime opTime, long long hash) {
+ return makeNoopOplogEntry({hash, opTime});
+}
+
+BSONObj makeNoopOplogEntry(Seconds seconds, long long hash) {
+ return makeNoopOplogEntry({{seconds, 0}, 1LL}, hash);
+}
+
+BSONObj makeCursorResponse(CursorId cursorId,
+ Fetcher::Documents oplogEntries,
+ bool isFirstBatch = true) {
+ BSONObjBuilder bob;
+ {
+ BSONObjBuilder cursorBob(bob.subobjStart("cursor"));
+ cursorBob.append("id", cursorId);
+ cursorBob.append("ns", nss.toString());
+ {
+ BSONArrayBuilder batchBob(
+ cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch"));
+ for (auto oplogEntry : oplogEntries) {
+ batchBob.append(oplogEntry);
+ }
+ }
+ }
+ bob.append("ok", 1);
+ return bob.obj();
+}
+
+TEST_F(OplogFetcherTest, InvalidMetadataInResponseStopsTheOplogFetcher) {
+ auto shutdownState = processSingleBatch(
+ {makeCursorResponse(0, {}),
+ BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1)),
+ Milliseconds(0)});
+
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus());
+}
+
+TEST_F(OplogFetcherTest, VaidMetadataInResponseShouldBeForwardedToProcessMetadataFn) {
+ rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(metadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+ processSingleBatch(
+ {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj, Milliseconds(0)});
+ ASSERT_EQUALS(metadata.getPrimaryIndex(),
+ dataReplicatorExternalState->metadataProcessed.getPrimaryIndex());
+}
+
+TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithRemoteOplogStaleError) {
+ ASSERT_EQUALS(ErrorCodes::RemoteOplogStale,
+ processSingleBatch(makeCursorResponse(0, {}))->getStatus());
+}
+
+TEST_F(OplogFetcherTest,
+ MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
+ ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
+ processSingleBatch(makeCursorResponse(0, {BSONObj()}))->getStatus());
+}
+
+TEST_F(
+ OplogFetcherTest,
+ LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
+ ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
+ processSingleBatch(
+ makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}))
+ ->getStatus());
+}
+
+TEST_F(OplogFetcherTest,
+ LastHashFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
+ ASSERT_EQUALS(
+ ErrorCodes::OplogStartMissing,
+ processSingleBatch(
+ makeCursorResponse(0, {makeNoopOplogEntry(lastFetched.opTime, lastFetched.value + 1)}))
+ ->getStatus());
+}
+
+TEST_F(OplogFetcherTest,
+ MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) {
+ ASSERT_EQUALS(
+ ErrorCodes::NoSuchKey,
+ processSingleBatch(makeCursorResponse(0,
+ {makeNoopOplogEntry(lastFetched),
+ BSON("o" << BSON("msg"
+ << "oplog entry without optime"))}))
+ ->getStatus());
+}
+
+TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) {
+ ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
+ processSingleBatch(makeCursorResponse(0,
+ {makeNoopOplogEntry(lastFetched),
+ makeNoopOplogEntry(Seconds(1000), 1),
+ makeNoopOplogEntry(Seconds(2000), 1),
+ makeNoopOplogEntry(Seconds(1500), 1)}))
+ ->getStatus());
+}
+
+TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) {
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
+ auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
+ Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
+
+ Milliseconds elapsed(600);
+ auto shutdownState =
+ processSingleBatch({makeCursorResponse(0, documents), rpc::makeEmptyMetadata(), elapsed});
+
+ ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
+ ASSERT_EQUALS(secondEntry, lastEnqueuedDocuments[0]);
+ ASSERT_EQUALS(thirdEntry, lastEnqueuedDocuments[1]);
+
+ ASSERT_EQUALS(3U, lastEnqueuedDocumentsInfo.networkDocumentCount);
+ ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
+ lastEnqueuedDocumentsInfo.networkDocumentBytes);
+
+ ASSERT_EQUALS(2U, lastEnqueuedDocumentsInfo.toApplyDocumentCount);
+ ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()),
+ lastEnqueuedDocumentsInfo.toApplyDocumentBytes);
+
+ ASSERT_EQUALS(thirdEntry["h"].numberLong(), lastEnqueuedDocumentsInfo.lastDocument.value);
+ ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)),
+ lastEnqueuedDocumentsInfo.lastDocument.opTime);
+
+ ASSERT_EQUALS(elapsed, lastEnqueuedElapsed);
+
+ // The last fetched optime and hash should be updated after pushing the operations into the
+ // buffer and reflected in the shutdown callback arguments.
+ ASSERT_OK(shutdownState->getStatus());
+ ASSERT_EQUALS(OpTimeWithHash(thirdEntry["h"].numberLong(),
+ unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry))),
+ shutdownState->getLastFetched());
+}
+
+void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) {
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
+ auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
+ Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
+
+ BSONObj metadataObj;
+ if (metadata) {
+ BSONObjBuilder bob;
+ ASSERT_OK(metadata->writeToMetadata(&bob));
+ metadataObj = bob.obj();
+ }
+
+ dataReplicatorExternalState->shouldStopFetchingResult = true;
+
+ auto shutdownState =
+ processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)});
+
+ // Sync source checking happens after we have successfully pushed the operations into
+ // the buffer for the next replication phase (eg. applier).
+ // The last fetched optime and hash should be reflected in the shutdown callback
+ // arguments.
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus());
+ ASSERT_EQUALS(OpTimeWithHash(thirdEntry["h"].numberLong(),
+ unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry))),
+ shutdownState->getLastFetched());
+}
+
+TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) {
+ testSyncSourceChecking(nullptr);
+
+ // Sync source optime and "hasSyncSource" are not available if the respone does not
+ // contain metadata.
+ ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
+ ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime);
+ ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
+}
+
+TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithMetadataStopsTheOplogFetcher) {
+ rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(),
+ {{Seconds(10000), 0}, 1},
+ {{Seconds(20000), 0}, 1},
+ 1,
+ OID::gen(),
+ 2,
+ 2);
+
+ testSyncSourceChecking(&metadata);
+
+ // Sync source optime and "hasSyncSource" can be set if the respone contains metadata.
+ ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
+ ASSERT_EQUALS(metadata.getLastOpVisible(), dataReplicatorExternalState->syncSourceLastOpTime);
+ ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource);
+}
+
+TEST_F(OplogFetcherTest,
+ FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
+ rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(),
+ {{Seconds(10000), 0}, 1},
+ {{Seconds(20000), 0}, 1},
+ 1,
+ OID::gen(),
+ 2,
+ -1);
+
+ testSyncSourceChecking(&metadata);
+
+ // Sync source "hasSyncSource" is derived from metadata.
+ ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
+ ASSERT_EQUALS(metadata.getLastOpVisible(), dataReplicatorExternalState->syncSourceLastOpTime);
+ ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
+}
+
+
+RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionProtocol) {
+ ShutdownState shutdownState;
+
+ if (!isV1ElectionProtocol) {
+ dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm;
+ }
+
+ OplogFetcher oplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(isV1ElectionProtocol),
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ stdx::ref(shutdownState));
+
+ ASSERT_OK(oplogFetcher.startup());
+
+ CursorId cursorId = 22LL;
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
+ processNetworkResponse(makeCursorResponse(cursorId, {firstEntry, secondEntry}), true);
+
+ ASSERT_EQUALS(1U, lastEnqueuedDocuments.size());
+ ASSERT_EQUALS(secondEntry, lastEnqueuedDocuments[0]);
+
+ // Set cursor ID to 0 in getMore response to indicate no more data available.
+ auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
+ auto fourthEntry = makeNoopOplogEntry({{Seconds(1200), 0}, lastFetched.opTime.getTerm()}, 300);
+ auto request = processNetworkResponse(makeCursorResponse(0, {thirdEntry, fourthEntry}, false));
+
+ ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName());
+ ASSERT_EQUALS(nss.coll(), request.cmdObj["collection"].String());
+ ASSERT_EQUALS(int(durationCount<Milliseconds>(oplogFetcher.getAwaitDataTimeout_forTest())),
+ request.cmdObj.getIntField("maxTimeMS"));
+
+ ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
+ ASSERT_EQUALS(thirdEntry, lastEnqueuedDocuments[0]);
+ ASSERT_EQUALS(fourthEntry, lastEnqueuedDocuments[1]);
+
+ oplogFetcher.shutdown();
+ oplogFetcher.join();
+
+ ASSERT_OK(shutdownState.getStatus());
+ ASSERT_EQUALS(OpTimeWithHash(fourthEntry["h"].numberLong(),
+ unittest::assertGet(OpTime::parseFromOplogEntry(fourthEntry))),
+ shutdownState.getLastFetched());
+
+ return request;
+}
+
+TEST_F(
+ OplogFetcherTest,
+ NoDataAvailableAfterFirstTwoBatchesShouldCauseTheOplogFetcherToShutDownWithSuccessfulStatus) {
+ auto request = testTwoBatchHandling(true);
+ ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, request.cmdObj["term"].numberLong());
+ ASSERT_EQUALS(dataReplicatorExternalState->lastCommittedOpTime,
+ unittest::assertGet(OpTime::parseFromOplogEntry(
+ request.cmdObj["lastKnownCommittedOpTime"].Obj())));
+}
+
+TEST_F(OplogFetcherTest,
+ GetMoreRequestUnderProtocolVersionZeroDoesNotIncludeTermOrLastKnownCommittedOpTime) {
+ auto request = testTwoBatchHandling(false);
+ ASSERT_FALSE(request.cmdObj.hasField("term"));
+ ASSERT_FALSE(request.cmdObj.hasField("lastKnownCommittedOpTime"));
+}
+
+TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) {
+ auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
+ auto secondEntry = BSON("o" << BSON("msg"
+ << "oplog entry without optime"));
+
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey,
+ OplogFetcher::validateDocuments(
+ {firstEntry, secondEntry},
+ true,
+ unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
+ .getStatus());
+}
+
+TEST_F(
+ OplogFetcherTest,
+ ValidateDocumentsReturnsOutOfOrderIfTimestampInFirstEntryIsEqualToLastTimestampAndNotProcessingFirstBatch) {
+ auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
+ auto secondEntry = makeNoopOplogEntry(Seconds(456), 200);
+
+ ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
+ OplogFetcher::validateDocuments(
+ {firstEntry, secondEntry},
+ false,
+ unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
+ .getStatus());
+}
+
+TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) {
+ auto firstEntry = makeNoopOplogEntry(Seconds(456), 100);
+ auto secondEntry = makeNoopOplogEntry(Seconds(123), 200);
+
+ ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
+ OplogFetcher::validateDocuments(
+ {firstEntry, secondEntry},
+ true,
+ unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
+ .getStatus());
+}
+
+TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) {
+ auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
+ auto secondEntry = makeNoopOplogEntry(Seconds(789), 200);
+ auto thirdEntry = makeNoopOplogEntry(Seconds(456), 300);
+
+ ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
+ OplogFetcher::validateDocuments(
+ {firstEntry, secondEntry, thirdEntry},
+ true,
+ unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
+ .getStatus());
+}
+
+TEST_F(OplogFetcherTest,
+ ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatch) {
+ auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
+ auto secondEntry = makeNoopOplogEntry(Seconds(456), 200);
+ auto thirdEntry = makeNoopOplogEntry(Seconds(789), 300);
+
+ auto info = unittest::assertGet(OplogFetcher::validateDocuments(
+ {firstEntry, secondEntry, thirdEntry},
+ true,
+ unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()));
+
+ ASSERT_EQUALS(3U, info.networkDocumentCount);
+ ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
+ info.networkDocumentBytes);
+
+ ASSERT_EQUALS(300LL, info.lastDocument.value);
+ ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)),
+ info.lastDocument.opTime);
+}
+
+TEST_F(OplogFetcherTest,
+ ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfNotProcessingFirstBatch) {
+ auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
+ auto secondEntry = makeNoopOplogEntry(Seconds(456), 200);
+ auto thirdEntry = makeNoopOplogEntry(Seconds(789), 300);
+
+ auto info = unittest::assertGet(OplogFetcher::validateDocuments(
+ {firstEntry, secondEntry, thirdEntry}, false, Timestamp(Seconds(100), 0)));
+
+ ASSERT_EQUALS(3U, info.networkDocumentCount);
+ ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
+ info.networkDocumentBytes);
+
+ ASSERT_EQUALS(info.networkDocumentCount, info.toApplyDocumentCount);
+ ASSERT_EQUALS(info.networkDocumentBytes, info.toApplyDocumentBytes);
+
+ ASSERT_EQUALS(300LL, info.lastDocument.value);
+ ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)),
+ info.lastDocument.opTime);
+}
+
+TEST_F(OplogFetcherTest,
+ ValidateDocumentsReturnsDefaultLastDocumentHashAndOpTimeWhenThereAreNoDocumentsToApply) {
+ auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
+
+ auto info = unittest::assertGet(OplogFetcher::validateDocuments(
+ {firstEntry},
+ true,
+ unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()));
+
+ ASSERT_EQUALS(1U, info.networkDocumentCount);
+ ASSERT_EQUALS(size_t(firstEntry.objsize()), info.networkDocumentBytes);
+
+ ASSERT_EQUALS(0U, info.toApplyDocumentCount);
+ ASSERT_EQUALS(0U, info.toApplyDocumentBytes);
+
+ ASSERT_EQUALS(0LL, info.lastDocument.value);
+ ASSERT_EQUALS(OpTime(), info.lastDocument.opTime);
+}
+
+TEST_F(OplogFetcherTest,
+ ValidateDocumentsReturnsOplogStartMissingWhenThereAreNoDocumentsWhenProcessingFirstBatch) {
+ ASSERT_EQUALS(
+ ErrorCodes::OplogStartMissing,
+ OplogFetcher::validateDocuments({}, true, Timestamp(Seconds(100), 0)).getStatus());
+}
+
+TEST_F(OplogFetcherTest,
+ ValidateDocumentsReturnsDefaultInfoWhenThereAreNoDocumentsWhenNotProcessingFirstBatch) {
+ auto info =
+ unittest::assertGet(OplogFetcher::validateDocuments({}, false, Timestamp(Seconds(100), 0)));
+
+ ASSERT_EQUALS(0U, info.networkDocumentCount);
+ ASSERT_EQUALS(0U, info.networkDocumentBytes);
+
+ ASSERT_EQUALS(0U, info.toApplyDocumentCount);
+ ASSERT_EQUALS(0U, info.toApplyDocumentBytes);
+
+ ASSERT_EQUALS(0LL, info.lastDocument.value);
+ ASSERT_EQUALS(OpTime(), info.lastDocument.opTime);
+}
+
+} // namespace
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 23fd498cb52..068bb49ef51 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/check_quorum_for_config_change.h"
+#include "mongo/db/repl/data_replicator_external_state_impl.h"
#include "mongo/db/repl/elect_cmd_runner.h"
#include "mongo/db/repl/election_winner_declarer.h"
#include "mongo/db/repl/freshness_checker.h"
@@ -253,7 +254,9 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
_sleptLastElection(false),
_canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.isSlave())),
_canServeNonLocalReads(0U),
- _dr(createDataReplicatorOptions(this), &_replExecutor),
+ _dr(createDataReplicatorOptions(this),
+ stdx::make_unique<DataReplicatorExternalStateImpl>(this),
+ &_replExecutor),
_isDurableStorageEngine(isDurableStorageEngineFn ? *isDurableStorageEngineFn : []() -> bool {
return getGlobalServiceContext()->getGlobalStorageEngine()->isDurable();
}) {