summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorSamyukta Lanka <samy.lanka@mongodb.com>2020-01-16 23:39:39 +0000
committerevergreen <evergreen@mongodb.com>2020-01-16 23:39:39 +0000
commitb6f6ea652af457d4929978ff3442668183c527bd (patch)
treeed5f195046ed2083d7426be170603f63d2965fd7 /src/mongo/db/repl
parent8003fc047c5c742829ba80b86ec2e6c5bb4e8453 (diff)
downloadmongo-b6f6ea652af457d4929978ff3442668183c527bd.tar.gz
SERVER-45466 Implement startup and shutdown logic for the new oplog fetcher
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher.cpp5
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp140
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h21
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp167
4 files changed, 318 insertions, 15 deletions
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.cpp b/src/mongo/db/repl/abstract_oplog_fetcher.cpp
index 72c8bd0b788..98b1c547b45 100644
--- a/src/mongo/db/repl/abstract_oplog_fetcher.cpp
+++ b/src/mongo/db/repl/abstract_oplog_fetcher.cpp
@@ -47,9 +47,10 @@
namespace mongo {
namespace repl {
-namespace {
-MONGO_FAIL_POINT_DEFINE(hangBeforeStartingOplogFetcher);
+// This failpoint is shared with oplog_fetcher.
+MONGO_FAIL_POINT_DEFINE(hangBeforeStartingOplogFetcher)
+namespace {
Counter64 readersCreatedStats;
ServerStatusMetricField<Counter64> displayReadersCreated("repl.network.readersCreated",
&readersCreatedStats);
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 47b7a8a84d7..610a718c1a1 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -53,6 +53,10 @@ Seconds OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout(2);
MONGO_FAIL_POINT_DEFINE(stopReplProducer);
MONGO_FAIL_POINT_DEFINE(stopReplProducerOnDocument);
MONGO_FAIL_POINT_DEFINE(setSmallOplogGetMoreMaxTimeMS);
+MONGO_FAIL_POINT_DEFINE(hangAfterOplogFetcherCallbackScheduled);
+
+// TODO SERVER-45574: Define the failpoint in this file instead.
+extern FailPoint hangBeforeStartingOplogFetcher;
namespace {
@@ -571,5 +575,141 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
_getGetMoreMaxTime(),
_batchSize);
}
+
+NewOplogFetcher::NewOplogFetcher(
+ executor::TaskExecutor* executor,
+ OpTime lastFetched,
+ HostAndPort source,
+ ReplSetConfig config,
+ std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
+ int requiredRBID,
+ bool requireFresherSyncSource,
+ DataReplicatorExternalState* dataReplicatorExternalState,
+ EnqueueDocumentsFn enqueueDocumentsFn,
+ OnShutdownCallbackFn onShutdownCallbackFn,
+ const int batchSize,
+ StartingPoint startingPoint)
+ : AbstractAsyncComponent(executor, "oplog fetcher"),
+ _source(source),
+ _requiredRBID(requiredRBID),
+ _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)),
+ _onShutdownCallbackFn(onShutdownCallbackFn),
+ _lastFetched(lastFetched),
+ _metadataObj(makeMetadataObject()),
+ _requireFresherSyncSource(requireFresherSyncSource),
+ _dataReplicatorExternalState(dataReplicatorExternalState),
+ _enqueueDocumentsFn(enqueueDocumentsFn),
+ _awaitDataTimeout(calculateAwaitDataTimeout(config)),
+ _batchSize(batchSize),
+ _startingPoint(startingPoint) {
+
+ invariant(config.isInitialized());
+ invariant(!_lastFetched.isNull());
+ invariant(onShutdownCallbackFn);
+ invariant(enqueueDocumentsFn);
+}
+
+NewOplogFetcher::~NewOplogFetcher() {
+ shutdown();
+ join();
+}
+
+Status NewOplogFetcher::_doStartup_inlock() noexcept {
+ return _scheduleWorkAndSaveHandle_inlock(
+ [this](const executor::TaskExecutor::CallbackArgs& args) {
+ hangBeforeStartingOplogFetcher.pauseWhileSet();
+ _runQuery(args);
+ },
+ &_runQueryHandle,
+ "_runQuery");
+}
+
+void NewOplogFetcher::_doShutdown_inlock() noexcept {
+ _cancelHandle_inlock(_runQueryHandle);
+
+ // TODO SERVER-45468: Call shutdownAndDisallowReconnect on DBClientConnection
+}
+
+Mutex* NewOplogFetcher::_getMutex() noexcept {
+ return &_mutex;
+}
+
+OpTime NewOplogFetcher::getLastOpTimeFetched_forTest() const {
+ return _getLastOpTimeFetched();
+}
+
+OpTime NewOplogFetcher::_getLastOpTimeFetched() const {
+ stdx::lock_guard<Latch> lock(_mutex);
+ return _lastFetched;
+}
+
+void NewOplogFetcher::_finishCallback(Status status) {
+ invariant(isActive());
+
+ _onShutdownCallbackFn(status);
+
+ decltype(_onShutdownCallbackFn) onShutdownCallbackFn;
+ decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision;
+ stdx::lock_guard<Latch> lock(_mutex);
+ _transitionToComplete_inlock();
+
+ // Release any resources that might be held by the '_onShutdownCallbackFn' function object.
+ // The function object will be destroyed outside the lock since the temporary variable
+ // 'onShutdownCallbackFn' is declared before 'lock'.
+ invariant(_onShutdownCallbackFn);
+ std::swap(_onShutdownCallbackFn, onShutdownCallbackFn);
+
+ // Release any resources held by the OplogFetcherRestartDecision.
+ invariant(_oplogFetcherRestartDecision);
+ std::swap(_oplogFetcherRestartDecision, oplogFetcherRestartDecision);
+}
+
+void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) {
+ Status responseStatus =
+ _checkForShutdownAndConvertStatus(callbackData, "error running oplog fetcher");
+ if (!responseStatus.isOK()) {
+ _finishCallback(responseStatus);
+ return;
+ }
+
+ hangAfterOplogFetcherCallbackScheduled.pauseWhileSet();
+
+ while (true) {
+ bool isShuttingDown;
+ {
+ // Both of these checks need to happen while holding the mutex since they could race
+ // with shutdown.
+ stdx::lock_guard<Latch> lock(_mutex);
+ isShuttingDown = _isShuttingDown_inlock();
+ invariant(isShuttingDown || !_runQueryHandle.isCanceled());
+ }
+ if (isShuttingDown) {
+ _finishCallback(Status(ErrorCodes::CallbackCanceled, "oplog fetcher shutting down"));
+ return;
+ }
+ }
+}
+
+bool NewOplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(NewOplogFetcher* fetcher,
+ Status status) {
+ if (_numRestarts == _maxRestarts) {
+ log() << "Error returned from oplog query (no more query restarts left): "
+ << redact(status);
+ return false;
+ }
+ log() << "Restarting oplog query due to error: " << redact(status)
+ << ". Last fetched optime: " << fetcher->_getLastOpTimeFetched()
+ << ". Restarts remaining: " << (_maxRestarts - _numRestarts);
+ _numRestarts++;
+ return true;
+}
+
+void NewOplogFetcher::OplogFetcherRestartDecisionDefault::fetchSuccessful(
+ NewOplogFetcher* fetcher) {
+ _numRestarts = 0;
+};
+
+NewOplogFetcher::OplogFetcherRestartDecision::~OplogFetcherRestartDecision(){};
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index a17ba35532d..5ba6a5a6227 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -414,13 +414,7 @@ private:
// =============== AbstractAsyncComponent overrides ================
/**
- * Initializes a DBClientConnection to the sync source and schedules the _runQuery function to
- * run in a separate thread.
- *
- * Initializes a DBClientConnection to the sync source and uses its query function to schedule
- * a `find` command on the source's oplog for entries with a timestamp greater than or equal to
- * this node's last fetched. This will create a DBClientCursor which will be used until the
- * cursor fails or OplogFetcher is shut down.
+ * Schedules the _runQuery function to run in a separate thread.
*/
Status _doStartup_inlock() noexcept override;
@@ -434,10 +428,11 @@ private:
// ============= End AbstractAsyncComponent overrides ==============
- /*
+ /**
* Creates a DBClientConnection and executes a query to retrieve oplog entries from this node's
- * sync source. This will create a tailable, awaitData, exhaust cursor. For each batch returned
- * by the upstream node, _onSuccessfulBatch will be called with the response.
+ * sync source. This will create a tailable, awaitData, exhaust cursor which will be used until
+ * the cursor fails or OplogFetcher is shut down. For each batch returned by the upstream node,
+ * _onSuccessfulBatch will be called with the response.
*
* In the case of any network or response errors, this method will close the cursor and restart
* a new one. If OplogFetcherRestartDecision's shouldContinue function indicates it should not
@@ -445,7 +440,7 @@ private:
*/
void _runQuery(const executor::TaskExecutor::CallbackArgs& callbackData);
- /*
+ /**
* Executes a `find` query on the sync source's oplog and establishes a tailable, awaitData,
* exhaust cursor. If it is not successful in creating a new cursor, it will retry based on the
* OplogFetcherRestartDecision's shouldContinue function.
@@ -462,7 +457,7 @@ private:
*/
BSONObj _makeFindQuery(OpTime lastOpTimeFetched, Milliseconds findMaxTime) const;
- /*
+ /**
* Gets the next batch from the exhaust cursor.
*
* If there was an error getting the next batch, checks _oplogFetcherRestartDecision's
@@ -479,7 +474,7 @@ private:
*/
Status _onSuccessfulBatch(const Documents& documents);
- /*
+ /**
* Notifies caller that the oplog fetcher has completed processing operations from the remote
* oplog using the "_onShutdownCallbackFn".
*/
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 1a5e231044f..b6f7a41c876 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h"
#include "mongo/db/repl/data_replicator_external_state_mock.h"
#include "mongo/db/repl/oplog_fetcher.h"
+#include "mongo/db/repl/task_executor_mock.h"
#include "mongo/rpc/metadata.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
@@ -41,6 +42,7 @@
#include "mongo/unittest/ensure_fcv.h"
#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/fail_point.h"
#include "mongo/util/scopeguard.h"
namespace {
@@ -1023,4 +1025,169 @@ TEST_F(OplogFetcherTest,
ASSERT_EQUALS(OpTime(), info.lastDocument);
}
+
+class NewOplogFetcherTest : public executor::ThreadPoolExecutorTest {
+public:
+ void setUp() override;
+
+ // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use.
+ const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10;
+
+ std::unique_ptr<NewOplogFetcher> makeOplogFetcher();
+ std::unique_ptr<NewOplogFetcher> makeOplogFetcherWithDifferentExecutor(
+ executor::TaskExecutor* executor, NewOplogFetcher::OnShutdownCallbackFn fn);
+
+ std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState;
+
+ NewOplogFetcher::Documents lastEnqueuedDocuments;
+ NewOplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo;
+ NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn;
+
+ // The last OpTime fetched by the oplog fetcher.
+ OpTime lastFetched;
+};
+
+void NewOplogFetcherTest::setUp() {
+ executor::ThreadPoolExecutorTest::setUp();
+ launchExecutorThread();
+
+ lastFetched = {{123, 0}, 1};
+
+ dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>();
+ dataReplicatorExternalState->currentTerm = lastFetched.getTerm();
+ dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.getTerm()};
+
+ enqueueDocumentsFn = [this](NewOplogFetcher::Documents::const_iterator begin,
+ NewOplogFetcher::Documents::const_iterator end,
+ const NewOplogFetcher::DocumentsInfo& info) -> Status {
+ lastEnqueuedDocuments = {begin, end};
+ lastEnqueuedDocumentsInfo = info;
+ return Status::OK();
+ };
+}
+
+std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcher() {
+ return makeOplogFetcherWithDifferentExecutor(&getExecutor(), [](Status) {});
+}
+
+std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDifferentExecutor(
+ executor::TaskExecutor* executor, NewOplogFetcher::OnShutdownCallbackFn fn) {
+ return std::make_unique<NewOplogFetcher>(
+ executor,
+ lastFetched,
+ source,
+ _createConfig(),
+ std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(1),
+ -1,
+ true,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ fn,
+ defaultBatchSize,
+ NewOplogFetcher::StartingPoint::kSkipFirstDoc);
+}
+
+TEST_F(NewOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) {
+ getExecutor().shutdown();
+
+ auto oplogFetcher = makeOplogFetcher();
+
+ // Last optime fetched should match values passed to constructor.
+ ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest());
+
+ ASSERT_FALSE(oplogFetcher->isActive());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher->startup());
+ ASSERT_FALSE(oplogFetcher->isActive());
+
+ // Last optime fetched should not change.
+ ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest());
+}
+
+TEST_F(NewOplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToScheduleRunQuery) {
+ TaskExecutorMock taskExecutorMock(&getExecutor());
+ taskExecutorMock.shouldFailScheduleWorkRequest = []() { return true; };
+
+ // The onShutdownFn should not be called because the oplog fetcher should fail during startup.
+ auto oplogFetcher =
+ makeOplogFetcherWithDifferentExecutor(&taskExecutorMock, [](Status) { MONGO_UNREACHABLE; });
+
+ // Last optime fetched should match values passed to constructor.
+ ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest());
+
+ ASSERT_FALSE(oplogFetcher->isActive());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, oplogFetcher->startup());
+ ASSERT_FALSE(oplogFetcher->isActive());
+
+ // Last optime fetched should not change.
+ ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest());
+}
+
+TEST_F(NewOplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQueryScheduled) {
+ ShutdownState shutdownState;
+
+ // Defer scheduling work so that the executor's shutdown happens before startup's work is
+ // scheduled.
+ TaskExecutorMock taskExecutorMock(&getExecutor());
+ taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; };
+
+ auto oplogFetcher =
+ makeOplogFetcherWithDifferentExecutor(&taskExecutorMock, std::ref(shutdownState));
+
+ ASSERT_FALSE(oplogFetcher->isActive());
+ ASSERT_OK(oplogFetcher->startup());
+ ASSERT_TRUE(oplogFetcher->isActive());
+
+ getExecutor().shutdown();
+
+ oplogFetcher->join();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
+}
+
+TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQueryScheduled) {
+ ShutdownState shutdownState;
+
+ // Defer scheduling work so that the oplog fetcher's shutdown happens before startup's work is
+ // scheduled.
+ TaskExecutorMock taskExecutorMock(&getExecutor());
+ taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; };
+
+ auto oplogFetcher =
+ makeOplogFetcherWithDifferentExecutor(&taskExecutorMock, std::ref(shutdownState));
+
+ ASSERT_FALSE(oplogFetcher->isActive());
+ ASSERT_OK(oplogFetcher->startup());
+ ASSERT_TRUE(oplogFetcher->isActive());
+
+ oplogFetcher->shutdown();
+
+ oplogFetcher->join();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
+}
+
+TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQueryScheduled) {
+ ShutdownState shutdownState;
+
+ auto oplogFetcher =
+ makeOplogFetcherWithDifferentExecutor(&getExecutor(), std::ref(shutdownState));
+
+ auto waitForCallbackScheduledFailPoint =
+ globalFailPointRegistry().find("hangAfterOplogFetcherCallbackScheduled");
+ auto timesEnteredFailPoint = waitForCallbackScheduledFailPoint->setMode(FailPoint::alwaysOn, 0);
+
+ ASSERT_FALSE(oplogFetcher->isActive());
+ ASSERT_OK(oplogFetcher->startup());
+ ASSERT_TRUE(oplogFetcher->isActive());
+
+ waitForCallbackScheduledFailPoint->waitForTimesEntered(timesEnteredFailPoint + 1);
+ waitForCallbackScheduledFailPoint->setMode(FailPoint::off);
+
+ // Only call shutdown once we have confirmed that the callback is paused at the fail point.
+ oplogFetcher->shutdown();
+
+ oplogFetcher->join();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
+}
} // namespace