summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-09-21 11:47:18 -0400
committerBenety Goh <benety@mongodb.com>2016-09-22 12:04:55 -0400
commit1be7c1336f137d99042100c4c37cf6ae0e322813 (patch)
tree3f092b88764ad17dc8fdd995e2bc46ea1e93b13c /src/mongo
parent3a372a491db4c852dfb2258594a1b3b2846c3d5a (diff)
downloadmongo-1be7c1336f137d99042100c4c37cf6ae0e322813.tar.gz
SERVER-25702 added OplogFetcher constructor argument for max fetcher restarts
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/bgsync.cpp1
-rw-r--r--src/mongo/db/repl/data_replicator.cpp2
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp2
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h6
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp15
5 files changed, 26 insertions, 0 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index ad7cb8a744c..8456d677f0d 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -369,6 +369,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
source,
NamespaceString(rsOplogName),
config,
+ 0,
&dataReplicatorExternalState,
stdx::bind(&BackgroundSync::_enqueueDocuments,
this,
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 936390744d1..b1dee828b41 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -575,6 +575,7 @@ Status DataReplicator::_runInitialSyncAttempt_inlock(OperationContext* txn,
syncSource,
_opts.remoteOplogNS,
config,
+ 0,
_dataReplicatorExternalState.get(),
stdx::bind(&DataReplicator::_enqueueDocuments,
this,
@@ -1312,6 +1313,7 @@ Status DataReplicator::_scheduleFetch_inlock() {
_syncSource,
remoteOplogNS,
uassertStatusOK(_dataReplicatorExternalState->getCurrentConfig()),
+ 0,
_dataReplicatorExternalState.get(),
stdx::bind(&DataReplicator::_enqueueDocuments,
this,
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 81f39808c4d..9e5fd621cef 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -246,6 +246,7 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
HostAndPort source,
NamespaceString nss,
ReplicaSetConfig config,
+ std::size_t maxFetcherRestarts,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn)
@@ -254,6 +255,7 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
_nss(nss),
_metadataObject(uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL))),
_remoteCommandTimeout(config.getElectionTimeoutPeriod()),
+ _maxFetcherRestarts(maxFetcherRestarts),
_dataReplicatorExternalState(dataReplicatorExternalState),
_enqueueDocumentsFn(enqueueDocumentsFn),
_awaitDataTimeout(calculateAwaitDataTimeout(config)),
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 10d549471ba..352991319ef 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -28,6 +28,7 @@
#pragma once
+#include <cstddef>
#include <memory>
#include "mongo/base/disallow_copying.h"
@@ -128,6 +129,7 @@ public:
HostAndPort source,
NamespaceString nss,
ReplicaSetConfig config,
+ std::size_t maxFetcherRestarts,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn);
@@ -216,6 +218,10 @@ private:
const NamespaceString _nss;
const BSONObj _metadataObject;
const Milliseconds _remoteCommandTimeout;
+
+ // Maximum number of times to restart the fetcher consecutively on non-cancellation errors.
+ const std::size_t _maxFetcherRestarts;
+
DataReplicatorExternalState* const _dataReplicatorExternalState;
const EnqueueDocumentsFn _enqueueDocumentsFn;
const Milliseconds _awaitDataTimeout;
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 7c554005fa8..0399cc32dd3 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -198,6 +198,7 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(
source,
nss,
_createConfig(true),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(*shutdownState));
@@ -228,6 +229,7 @@ TEST_F(OplogFetcherTest, InvalidConstruction) {
source,
nss,
_createConfig(true),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {}),
@@ -241,6 +243,7 @@ TEST_F(OplogFetcherTest, InvalidConstruction) {
source,
nss,
_createConfig(true),
+ 0,
dataReplicatorExternalState.get(),
OplogFetcher::EnqueueDocumentsFn(),
[](Status, OpTimeWithHash) {}),
@@ -254,6 +257,7 @@ TEST_F(OplogFetcherTest, InvalidConstruction) {
source,
nss,
ReplicaSetConfig(),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {}),
@@ -267,6 +271,7 @@ TEST_F(OplogFetcherTest, InvalidConstruction) {
source,
nss,
_createConfig(true),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
OplogFetcher::OnShutdownCallbackFn()),
@@ -291,6 +296,7 @@ TEST_F(
source,
nss,
_createConfig(true),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -311,6 +317,7 @@ TEST_F(
source,
nss,
_createConfig(true),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -328,6 +335,7 @@ TEST_F(OplogFetcherTest, MetadataObjectContainsReplSetMetadataFieldUnderProtocol
source,
nss,
_createConfig(true),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -342,6 +350,7 @@ TEST_F(OplogFetcherTest, MetadataObjectIsEmptyUnderProtocolVersion0) {
source,
nss,
_createConfig(false),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -358,6 +367,7 @@ TEST_F(OplogFetcherTest, RemoteCommandTimeoutShouldEqualElectionTimeout) {
source,
nss,
config,
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -372,6 +382,7 @@ TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProt
source,
nss,
config,
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -385,6 +396,7 @@ TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldBeAConstantUnderProtocolVersion0)
source,
nss,
_createConfig(false),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -400,6 +412,7 @@ TEST_F(OplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarti
source,
nss,
_createConfig(true),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {});
@@ -423,6 +436,7 @@ TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupStopsTheOplogFetcher) {
source,
nss,
_createConfig(true),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(shutdownState));
@@ -665,6 +679,7 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionPro
source,
nss,
_createConfig(isV1ElectionProtocol),
+ 0,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(shutdownState));