From 1f607d93a8554442572337e02a85d8ec3df74b42 Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Thu, 16 Mar 2017 15:07:17 -0400 Subject: SERVER-28204 migrate OplogFetcher to use AbstractAsyncComponent --- src/mongo/db/repl/SConscript | 1 + src/mongo/db/repl/oplog_fetcher.cpp | 89 ++++---------------------------- src/mongo/db/repl/oplog_fetcher.h | 62 +++------------------- src/mongo/db/repl/oplog_fetcher_test.cpp | 2 +- 4 files changed, 17 insertions(+), 137 deletions(-) (limited to 'src/mongo/db') diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index bd4bfdbbfc6..54ae8e436c0 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -740,6 +740,7 @@ env.Library( 'oplog_fetcher.cpp', ], LIBDEPS=[ + 'abstract_async_component', 'repl_coordinator_interface', 'replica_set_messages', '$BUILD_DIR/mongo/base', diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index b0db62af574..3ee340e147b 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -333,7 +333,7 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn) - : _executor(executor), + : AbstractAsyncComponent(executor, "oplog fetcher"), _source(source), _nss(nss), _metadataObject(uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL))), @@ -367,34 +367,16 @@ std::string OplogFetcher::toString() const { << " fetcher: " << _fetcher->getDiagnosticString(); } -bool OplogFetcher::isActive() const { - stdx::lock_guard lock(_mutex); - return _isActive_inlock(); +Status OplogFetcher::_doStartup_inlock() noexcept { + return _scheduleFetcher_inlock(); } -bool OplogFetcher::_isActive_inlock() const { - return State::kRunning == _state || State::kShuttingDown == _state; +void OplogFetcher::_doShutdown_inlock() noexcept { + _fetcher->shutdown(); } -Status OplogFetcher::startup() { - stdx::lock_guard lock(_mutex); - switch (_state) { - case State::kPreStart: - _state = State::kRunning; - break; - case State::kRunning: - return Status(ErrorCodes::InternalError, "oplog fetcher already started"); - case State::kShuttingDown: - return Status(ErrorCodes::ShutdownInProgress, "oplog fetcher shutting down"); - case State::kComplete: - return Status(ErrorCodes::ShutdownInProgress, "oplog fetcher completed"); - } - - auto status = _scheduleFetcher_inlock(); - if (!status.isOK()) { - _state = State::kComplete; - } - return status; +stdx::mutex* OplogFetcher::_getMutex() noexcept { + return &_mutex; } Status OplogFetcher::_scheduleFetcher_inlock() { @@ -402,29 +384,6 @@ Status OplogFetcher::_scheduleFetcher_inlock() { return _fetcher->schedule(); } -void OplogFetcher::shutdown() { - stdx::lock_guard lock(_mutex); - switch (_state) { - case State::kPreStart: - // Transition directly from PreStart to Complete if not started yet. - _state = State::kComplete; - return; - case State::kRunning: - _state = State::kShuttingDown; - break; - case State::kShuttingDown: - case State::kComplete: - // Nothing to do if we are already in ShuttingDown or Complete state. - return; - } - _fetcher->shutdown(); -} - -void OplogFetcher::join() { - stdx::unique_lock lock(_mutex); - _condition.wait(lock, [this]() { return !_isActive_inlock(); }); -} - OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const { stdx::lock_guard lock(_mutex); return _lastFetched; @@ -443,11 +402,6 @@ Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const { return _awaitDataTimeout; } -OplogFetcher::State OplogFetcher::getState_forTest() const { - stdx::lock_guard lk(_mutex); - return _state; -} - void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, BSONObjBuilder* getMoreBob) { const auto& responseStatus = result.getStatus(); @@ -664,9 +618,7 @@ void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash) decltype(_onShutdownCallbackFn) onShutdownCallbackFn; stdx::lock_guard lock(_mutex); - invariant(State::kComplete != _state); - _state = State::kComplete; - _condition.notify_all(); + _transitionToComplete_inlock(); // Release any resources that might be held by the '_onShutdownCallbackFn' function object. // The function object will be destroyed outside the lock since the temporary variable @@ -678,7 +630,7 @@ void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash) std::unique_ptr OplogFetcher::_makeFetcher(long long currentTerm, OpTime lastFetchedOpTime) { return stdx::make_unique( - _executor, + _getExecutor(), _source, _nss.db().toString(), makeFindCommandObject(_nss, currentTerm, lastFetchedOpTime), @@ -687,28 +639,5 @@ std::unique_ptr OplogFetcher::_makeFetcher(long long currentTerm, kOplogQueryNetworkTimeout); } -bool OplogFetcher::_isShuttingDown() const { - stdx::lock_guard lock(_mutex); - return _isShuttingDown_inlock(); -} - -bool OplogFetcher::_isShuttingDown_inlock() const { - return State::kShuttingDown == _state; -} - -std::ostream& operator<<(std::ostream& os, const OplogFetcher::State& state) { - switch (state) { - case OplogFetcher::State::kPreStart: - return os << "PreStart"; - case OplogFetcher::State::kRunning: - return os << "Running"; - case OplogFetcher::State::kShuttingDown: - return os << "ShuttingDown"; - case OplogFetcher::State::kComplete: - return os << "Complete"; - } - MONGO_UNREACHABLE; -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index ad4fde296b1..d2067ab9c3f 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -29,7 +29,6 @@ #pragma once #include -#include #include #include "mongo/base/disallow_copying.h" @@ -37,6 +36,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/client/fetcher.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/abstract_async_component.h" #include "mongo/db/repl/data_replicator_external_state.h" #include "mongo/db/repl/optime_with.h" #include "mongo/db/repl/repl_set_config.h" @@ -76,7 +76,7 @@ using OpTimeWithHash = OpTimeWith; * When there is an error or when it is not possible to issue another getMore request, calls * "onShutdownCallbackFn" to signal the end of processing. */ -class OplogFetcher { +class OplogFetcher : public AbstractAsyncComponent { MONGO_DISALLOW_COPYING(OplogFetcher); public: @@ -146,29 +146,6 @@ public: std::string toString() const; - /** - * Returns true if we have scheduled the fetcher to read the oplog on the sync source. - */ - bool isActive() const; - - /** - * Starts fetcher so that we begin tailing the remote oplog on the sync source. - */ - Status startup(); - - /** - * Cancels both scheduled and active remote command requests. - * Returns immediately if the Oplog Fetcher is not active. - * It is fine to call this multiple times. - */ - void shutdown(); - - /** - * Waits until the oplog fetcher is inactive. - * It is fine to call this multiple times. - */ - void join(); - /** * Returns optime and hash of the last oplog entry in the most recent oplog query result. */ @@ -196,22 +173,11 @@ public: */ Milliseconds getAwaitDataTimeout_forTest() const; - // State transitions: - // PreStart --> Running --> ShuttingDown --> Complete - // It is possible to skip intermediate states. For example, - // Calling shutdown() when the cloner has not started will transition from PreStart directly - // to Complete. - // This enum class is made public for testing. - enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; - - /** - * Returns current oplog fetcher state. - * For testing only. - */ - State getState_forTest() const; - private: - bool _isActive_inlock() const; + // AbstractAsyncComponent overrides. + Status _doStartup_inlock() noexcept override; + void _doShutdown_inlock() noexcept override; + stdx::mutex* _getMutex() noexcept override; /** * Schedules fetcher and updates counters. @@ -239,18 +205,11 @@ private: */ std::unique_ptr _makeFetcher(long long currentTerm, OpTime lastFetchedOpTime); - /** - * Returns whether the oplog fetcher is in shutdown. - */ - bool _isShuttingDown() const; - bool _isShuttingDown_inlock() const; - // Protects member data of this OplogFetcher. mutable stdx::mutex _mutex; mutable stdx::condition_variable _condition; - executor::TaskExecutor* const _executor; const HostAndPort _source; const NamespaceString _nss; const BSONObj _metadataObject; @@ -277,9 +236,6 @@ private: // "_enqueueDocumentsFn". OpTimeWithHash _lastFetched; - // Current oplog fetcher state. See comments for State enum class for details. - State _state = State::kPreStart; - // Fetcher restarts since the last successful oplog query response. std::size_t _fetcherRestarts = 0; @@ -287,11 +243,5 @@ private: std::unique_ptr _shuttingDownFetcher; }; -/** - * Insertion operator for OplogFetcher::State. Formats oplog fetcher state for output stream. - * For testing only. - */ -std::ostream& operator<<(std::ostream& os, const OplogFetcher::State& state); - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 827cbd86e26..4effda18af3 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -346,7 +346,7 @@ TEST_F(OplogFetcherTest, StartupWhenActiveReturnsIllegalOperation) { ASSERT_TRUE(oplogFetcher.isActive()); auto status = oplogFetcher.startup(); getExecutor().shutdown(); - ASSERT_EQUALS(ErrorCodes::InternalError, status); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, status); ASSERT_STRING_CONTAINS(status.reason(), "oplog fetcher already started"); } -- cgit v1.2.1