diff options
author | Samyukta Lanka <samy.lanka@mongodb.com> | 2020-01-16 23:39:39 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-01-16 23:39:39 +0000 |
commit | b6f6ea652af457d4929978ff3442668183c527bd (patch) | |
tree | ed5f195046ed2083d7426be170603f63d2965fd7 /src/mongo/db/repl | |
parent | 8003fc047c5c742829ba80b86ec2e6c5bb4e8453 (diff) | |
download | mongo-b6f6ea652af457d4929978ff3442668183c527bd.tar.gz |
SERVER-45466 Implement startup and shutdown logic for the new oplog fetcher
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/abstract_oplog_fetcher.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 140 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 167 |
4 files changed, 318 insertions, 15 deletions
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.cpp b/src/mongo/db/repl/abstract_oplog_fetcher.cpp index 72c8bd0b788..98b1c547b45 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher.cpp @@ -47,9 +47,10 @@ namespace mongo { namespace repl { -namespace { -MONGO_FAIL_POINT_DEFINE(hangBeforeStartingOplogFetcher); +// This failpoint is shared with oplog_fetcher. +MONGO_FAIL_POINT_DEFINE(hangBeforeStartingOplogFetcher) +namespace { Counter64 readersCreatedStats; ServerStatusMetricField<Counter64> displayReadersCreated("repl.network.readersCreated", &readersCreatedStats); diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 47b7a8a84d7..610a718c1a1 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -53,6 +53,10 @@ Seconds OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout(2); MONGO_FAIL_POINT_DEFINE(stopReplProducer); MONGO_FAIL_POINT_DEFINE(stopReplProducerOnDocument); MONGO_FAIL_POINT_DEFINE(setSmallOplogGetMoreMaxTimeMS); +MONGO_FAIL_POINT_DEFINE(hangAfterOplogFetcherCallbackScheduled); + +// TODO SERVER-45574: Define the failpoint in this file instead. +extern FailPoint hangBeforeStartingOplogFetcher; namespace { @@ -571,5 +575,141 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons _getGetMoreMaxTime(), _batchSize); } + +NewOplogFetcher::NewOplogFetcher( + executor::TaskExecutor* executor, + OpTime lastFetched, + HostAndPort source, + ReplSetConfig config, + std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, + int requiredRBID, + bool requireFresherSyncSource, + DataReplicatorExternalState* dataReplicatorExternalState, + EnqueueDocumentsFn enqueueDocumentsFn, + OnShutdownCallbackFn onShutdownCallbackFn, + const int batchSize, + StartingPoint startingPoint) + : AbstractAsyncComponent(executor, "oplog fetcher"), + _source(source), + _requiredRBID(requiredRBID), + _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)), + _onShutdownCallbackFn(onShutdownCallbackFn), + _lastFetched(lastFetched), + _metadataObj(makeMetadataObject()), + _requireFresherSyncSource(requireFresherSyncSource), + _dataReplicatorExternalState(dataReplicatorExternalState), + _enqueueDocumentsFn(enqueueDocumentsFn), + _awaitDataTimeout(calculateAwaitDataTimeout(config)), + _batchSize(batchSize), + _startingPoint(startingPoint) { + + invariant(config.isInitialized()); + invariant(!_lastFetched.isNull()); + invariant(onShutdownCallbackFn); + invariant(enqueueDocumentsFn); +} + +NewOplogFetcher::~NewOplogFetcher() { + shutdown(); + join(); +} + +Status NewOplogFetcher::_doStartup_inlock() noexcept { + return _scheduleWorkAndSaveHandle_inlock( + [this](const executor::TaskExecutor::CallbackArgs& args) { + hangBeforeStartingOplogFetcher.pauseWhileSet(); + _runQuery(args); + }, + &_runQueryHandle, + "_runQuery"); +} + +void NewOplogFetcher::_doShutdown_inlock() noexcept { + _cancelHandle_inlock(_runQueryHandle); + + // TODO SERVER-45468: Call shutdownAndDisallowReconnect on DBClientConnection +} + +Mutex* NewOplogFetcher::_getMutex() noexcept { + return &_mutex; +} + +OpTime NewOplogFetcher::getLastOpTimeFetched_forTest() const { + return _getLastOpTimeFetched(); +} + +OpTime NewOplogFetcher::_getLastOpTimeFetched() const { + stdx::lock_guard<Latch> lock(_mutex); + return _lastFetched; +} + +void NewOplogFetcher::_finishCallback(Status status) { + invariant(isActive()); + + _onShutdownCallbackFn(status); + + decltype(_onShutdownCallbackFn) onShutdownCallbackFn; + decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision; + stdx::lock_guard<Latch> lock(_mutex); + _transitionToComplete_inlock(); + + // Release any resources that might be held by the '_onShutdownCallbackFn' function object. + // The function object will be destroyed outside the lock since the temporary variable + // 'onShutdownCallbackFn' is declared before 'lock'. + invariant(_onShutdownCallbackFn); + std::swap(_onShutdownCallbackFn, onShutdownCallbackFn); + + // Release any resources held by the OplogFetcherRestartDecision. + invariant(_oplogFetcherRestartDecision); + std::swap(_oplogFetcherRestartDecision, oplogFetcherRestartDecision); +} + +void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) { + Status responseStatus = + _checkForShutdownAndConvertStatus(callbackData, "error running oplog fetcher"); + if (!responseStatus.isOK()) { + _finishCallback(responseStatus); + return; + } + + hangAfterOplogFetcherCallbackScheduled.pauseWhileSet(); + + while (true) { + bool isShuttingDown; + { + // Both of these checks need to happen while holding the mutex since they could race + // with shutdown. + stdx::lock_guard<Latch> lock(_mutex); + isShuttingDown = _isShuttingDown_inlock(); + invariant(isShuttingDown || !_runQueryHandle.isCanceled()); + } + if (isShuttingDown) { + _finishCallback(Status(ErrorCodes::CallbackCanceled, "oplog fetcher shutting down")); + return; + } + } +} + +bool NewOplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(NewOplogFetcher* fetcher, + Status status) { + if (_numRestarts == _maxRestarts) { + log() << "Error returned from oplog query (no more query restarts left): " + << redact(status); + return false; + } + log() << "Restarting oplog query due to error: " << redact(status) + << ". Last fetched optime: " << fetcher->_getLastOpTimeFetched() + << ". Restarts remaining: " << (_maxRestarts - _numRestarts); + _numRestarts++; + return true; +} + +void NewOplogFetcher::OplogFetcherRestartDecisionDefault::fetchSuccessful( + NewOplogFetcher* fetcher) { + _numRestarts = 0; +}; + +NewOplogFetcher::OplogFetcherRestartDecision::~OplogFetcherRestartDecision(){}; + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index a17ba35532d..5ba6a5a6227 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -414,13 +414,7 @@ private: // =============== AbstractAsyncComponent overrides ================ /** - * Initializes a DBClientConnection to the sync source and schedules the _runQuery function to - * run in a separate thread. - * - * Initializes a DBClientConnection to the sync source and uses its query function to schedule - * a `find` command on the source's oplog for entries with a timestamp greater than or equal to - * this node's last fetched. This will create a DBClientCursor which will be used until the - * cursor fails or OplogFetcher is shut down. + * Schedules the _runQuery function to run in a separate thread. */ Status _doStartup_inlock() noexcept override; @@ -434,10 +428,11 @@ private: // ============= End AbstractAsyncComponent overrides ============== - /* + /** * Creates a DBClientConnection and executes a query to retrieve oplog entries from this node's - * sync source. This will create a tailable, awaitData, exhaust cursor. For each batch returned - * by the upstream node, _onSuccessfulBatch will be called with the response. + * sync source. This will create a tailable, awaitData, exhaust cursor which will be used until + * the cursor fails or OplogFetcher is shut down. For each batch returned by the upstream node, + * _onSuccessfulBatch will be called with the response. * * In the case of any network or response errors, this method will close the cursor and restart * a new one. If OplogFetcherRestartDecision's shouldContinue function indicates it should not @@ -445,7 +440,7 @@ private: */ void _runQuery(const executor::TaskExecutor::CallbackArgs& callbackData); - /* + /** * Executes a `find` query on the sync source's oplog and establishes a tailable, awaitData, * exhaust cursor. If it is not successful in creating a new cursor, it will retry based on the * OplogFetcherRestartDecision's shouldContinue function. @@ -462,7 +457,7 @@ private: */ BSONObj _makeFindQuery(OpTime lastOpTimeFetched, Milliseconds findMaxTime) const; - /* + /** * Gets the next batch from the exhaust cursor. * * If there was an error getting the next batch, checks _oplogFetcherRestartDecision's @@ -479,7 +474,7 @@ private: */ Status _onSuccessfulBatch(const Documents& documents); - /* + /** * Notifies caller that the oplog fetcher has completed processing operations from the remote * oplog using the "_onShutdownCallbackFn". */ diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 1a5e231044f..b6f7a41c876 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h" #include "mongo/db/repl/data_replicator_external_state_mock.h" #include "mongo/db/repl/oplog_fetcher.h" +#include "mongo/db/repl/task_executor_mock.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" @@ -41,6 +42,7 @@ #include "mongo/unittest/ensure_fcv.h" #include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/fail_point.h" #include "mongo/util/scopeguard.h" namespace { @@ -1023,4 +1025,169 @@ TEST_F(OplogFetcherTest, ASSERT_EQUALS(OpTime(), info.lastDocument); } + +class NewOplogFetcherTest : public executor::ThreadPoolExecutorTest { +public: + void setUp() override; + + // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use. + const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10; + + std::unique_ptr<NewOplogFetcher> makeOplogFetcher(); + std::unique_ptr<NewOplogFetcher> makeOplogFetcherWithDifferentExecutor( + executor::TaskExecutor* executor, NewOplogFetcher::OnShutdownCallbackFn fn); + + std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState; + + NewOplogFetcher::Documents lastEnqueuedDocuments; + NewOplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo; + NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn; + + // The last OpTime fetched by the oplog fetcher. + OpTime lastFetched; +}; + +void NewOplogFetcherTest::setUp() { + executor::ThreadPoolExecutorTest::setUp(); + launchExecutorThread(); + + lastFetched = {{123, 0}, 1}; + + dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>(); + dataReplicatorExternalState->currentTerm = lastFetched.getTerm(); + dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.getTerm()}; + + enqueueDocumentsFn = [this](NewOplogFetcher::Documents::const_iterator begin, + NewOplogFetcher::Documents::const_iterator end, + const NewOplogFetcher::DocumentsInfo& info) -> Status { + lastEnqueuedDocuments = {begin, end}; + lastEnqueuedDocumentsInfo = info; + return Status::OK(); + }; +} + +std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcher() { + return makeOplogFetcherWithDifferentExecutor(&getExecutor(), [](Status) {}); +} + +std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDifferentExecutor( + executor::TaskExecutor* executor, NewOplogFetcher::OnShutdownCallbackFn fn) { + return std::make_unique<NewOplogFetcher>( + executor, + lastFetched, + source, + _createConfig(), + std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(1), + -1, + true, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + fn, + defaultBatchSize, + NewOplogFetcher::StartingPoint::kSkipFirstDoc); +} + +TEST_F(NewOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) { + getExecutor().shutdown(); + + auto oplogFetcher = makeOplogFetcher(); + + // Last optime fetched should match values passed to constructor. + ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest()); + + ASSERT_FALSE(oplogFetcher->isActive()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher->startup()); + ASSERT_FALSE(oplogFetcher->isActive()); + + // Last optime fetched should not change. + ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest()); +} + +TEST_F(NewOplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToScheduleRunQuery) { + TaskExecutorMock taskExecutorMock(&getExecutor()); + taskExecutorMock.shouldFailScheduleWorkRequest = []() { return true; }; + + // The onShutdownFn should not be called because the oplog fetcher should fail during startup. + auto oplogFetcher = + makeOplogFetcherWithDifferentExecutor(&taskExecutorMock, [](Status) { MONGO_UNREACHABLE; }); + + // Last optime fetched should match values passed to constructor. + ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest()); + + ASSERT_FALSE(oplogFetcher->isActive()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, oplogFetcher->startup()); + ASSERT_FALSE(oplogFetcher->isActive()); + + // Last optime fetched should not change. + ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest()); +} + +TEST_F(NewOplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQueryScheduled) { + ShutdownState shutdownState; + + // Defer scheduling work so that the executor's shutdown happens before startup's work is + // scheduled. + TaskExecutorMock taskExecutorMock(&getExecutor()); + taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; }; + + auto oplogFetcher = + makeOplogFetcherWithDifferentExecutor(&taskExecutorMock, std::ref(shutdownState)); + + ASSERT_FALSE(oplogFetcher->isActive()); + ASSERT_OK(oplogFetcher->startup()); + ASSERT_TRUE(oplogFetcher->isActive()); + + getExecutor().shutdown(); + + oplogFetcher->join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); +} + +TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQueryScheduled) { + ShutdownState shutdownState; + + // Defer scheduling work so that the oplog fetcher's shutdown happens before startup's work is + // scheduled. + TaskExecutorMock taskExecutorMock(&getExecutor()); + taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; }; + + auto oplogFetcher = + makeOplogFetcherWithDifferentExecutor(&taskExecutorMock, std::ref(shutdownState)); + + ASSERT_FALSE(oplogFetcher->isActive()); + ASSERT_OK(oplogFetcher->startup()); + ASSERT_TRUE(oplogFetcher->isActive()); + + oplogFetcher->shutdown(); + + oplogFetcher->join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); +} + +TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQueryScheduled) { + ShutdownState shutdownState; + + auto oplogFetcher = + makeOplogFetcherWithDifferentExecutor(&getExecutor(), std::ref(shutdownState)); + + auto waitForCallbackScheduledFailPoint = + globalFailPointRegistry().find("hangAfterOplogFetcherCallbackScheduled"); + auto timesEnteredFailPoint = waitForCallbackScheduledFailPoint->setMode(FailPoint::alwaysOn, 0); + + ASSERT_FALSE(oplogFetcher->isActive()); + ASSERT_OK(oplogFetcher->startup()); + ASSERT_TRUE(oplogFetcher->isActive()); + + waitForCallbackScheduledFailPoint->waitForTimesEntered(timesEnteredFailPoint + 1); + waitForCallbackScheduledFailPoint->setMode(FailPoint::off); + + // Only call shutdown once we have confirmed that the callback is paused at the fail point. + oplogFetcher->shutdown(); + + oplogFetcher->join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); +} } // namespace |