summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-03-16 15:07:17 -0400
committerBenety Goh <benety@mongodb.com>2017-03-20 14:31:53 -0400
commit1f607d93a8554442572337e02a85d8ec3df74b42 (patch)
tree448fe74cc78b661fc096a38153948f859952a8b7 /src/mongo/db/repl
parent66188320dc4f8d4c941160ecc11024690d88dd05 (diff)
downloadmongo-1f607d93a8554442572337e02a85d8ec3df74b42.tar.gz
SERVER-28204 migrate OplogFetcher to use AbstractAsyncComponent
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp89
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h62
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp2
4 files changed, 17 insertions, 137 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index bd4bfdbbfc6..54ae8e436c0 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -740,6 +740,7 @@ env.Library(
'oplog_fetcher.cpp',
],
LIBDEPS=[
+ 'abstract_async_component',
'repl_coordinator_interface',
'replica_set_messages',
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index b0db62af574..3ee340e147b 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -333,7 +333,7 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn)
- : _executor(executor),
+ : AbstractAsyncComponent(executor, "oplog fetcher"),
_source(source),
_nss(nss),
_metadataObject(uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL))),
@@ -367,34 +367,16 @@ std::string OplogFetcher::toString() const {
<< " fetcher: " << _fetcher->getDiagnosticString();
}
-bool OplogFetcher::isActive() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _isActive_inlock();
+Status OplogFetcher::_doStartup_inlock() noexcept {
+ return _scheduleFetcher_inlock();
}
-bool OplogFetcher::_isActive_inlock() const {
- return State::kRunning == _state || State::kShuttingDown == _state;
+void OplogFetcher::_doShutdown_inlock() noexcept {
+ _fetcher->shutdown();
}
-Status OplogFetcher::startup() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- switch (_state) {
- case State::kPreStart:
- _state = State::kRunning;
- break;
- case State::kRunning:
- return Status(ErrorCodes::InternalError, "oplog fetcher already started");
- case State::kShuttingDown:
- return Status(ErrorCodes::ShutdownInProgress, "oplog fetcher shutting down");
- case State::kComplete:
- return Status(ErrorCodes::ShutdownInProgress, "oplog fetcher completed");
- }
-
- auto status = _scheduleFetcher_inlock();
- if (!status.isOK()) {
- _state = State::kComplete;
- }
- return status;
+stdx::mutex* OplogFetcher::_getMutex() noexcept {
+ return &_mutex;
}
Status OplogFetcher::_scheduleFetcher_inlock() {
@@ -402,29 +384,6 @@ Status OplogFetcher::_scheduleFetcher_inlock() {
return _fetcher->schedule();
}
-void OplogFetcher::shutdown() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- switch (_state) {
- case State::kPreStart:
- // Transition directly from PreStart to Complete if not started yet.
- _state = State::kComplete;
- return;
- case State::kRunning:
- _state = State::kShuttingDown;
- break;
- case State::kShuttingDown:
- case State::kComplete:
- // Nothing to do if we are already in ShuttingDown or Complete state.
- return;
- }
- _fetcher->shutdown();
-}
-
-void OplogFetcher::join() {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- _condition.wait(lock, [this]() { return !_isActive_inlock(); });
-}
-
OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const {
stdx::lock_guard<stdx::mutex> lock(_mutex);
return _lastFetched;
@@ -443,11 +402,6 @@ Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const {
return _awaitDataTimeout;
}
-OplogFetcher::State OplogFetcher::getState_forTest() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _state;
-}
-
void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
BSONObjBuilder* getMoreBob) {
const auto& responseStatus = result.getStatus();
@@ -664,9 +618,7 @@ void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash)
decltype(_onShutdownCallbackFn) onShutdownCallbackFn;
stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(State::kComplete != _state);
- _state = State::kComplete;
- _condition.notify_all();
+ _transitionToComplete_inlock();
// Release any resources that might be held by the '_onShutdownCallbackFn' function object.
// The function object will be destroyed outside the lock since the temporary variable
@@ -678,7 +630,7 @@ void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash)
std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(long long currentTerm,
OpTime lastFetchedOpTime) {
return stdx::make_unique<Fetcher>(
- _executor,
+ _getExecutor(),
_source,
_nss.db().toString(),
makeFindCommandObject(_nss, currentTerm, lastFetchedOpTime),
@@ -687,28 +639,5 @@ std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(long long currentTerm,
kOplogQueryNetworkTimeout);
}
-bool OplogFetcher::_isShuttingDown() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _isShuttingDown_inlock();
-}
-
-bool OplogFetcher::_isShuttingDown_inlock() const {
- return State::kShuttingDown == _state;
-}
-
-std::ostream& operator<<(std::ostream& os, const OplogFetcher::State& state) {
- switch (state) {
- case OplogFetcher::State::kPreStart:
- return os << "PreStart";
- case OplogFetcher::State::kRunning:
- return os << "Running";
- case OplogFetcher::State::kShuttingDown:
- return os << "ShuttingDown";
- case OplogFetcher::State::kComplete:
- return os << "Complete";
- }
- MONGO_UNREACHABLE;
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index ad4fde296b1..d2067ab9c3f 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -29,7 +29,6 @@
#pragma once
#include <cstddef>
-#include <iosfwd>
#include <memory>
#include "mongo/base/disallow_copying.h"
@@ -37,6 +36,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/client/fetcher.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/abstract_async_component.h"
#include "mongo/db/repl/data_replicator_external_state.h"
#include "mongo/db/repl/optime_with.h"
#include "mongo/db/repl/repl_set_config.h"
@@ -76,7 +76,7 @@ using OpTimeWithHash = OpTimeWith<long long>;
* When there is an error or when it is not possible to issue another getMore request, calls
* "onShutdownCallbackFn" to signal the end of processing.
*/
-class OplogFetcher {
+class OplogFetcher : public AbstractAsyncComponent {
MONGO_DISALLOW_COPYING(OplogFetcher);
public:
@@ -147,29 +147,6 @@ public:
std::string toString() const;
/**
- * Returns true if we have scheduled the fetcher to read the oplog on the sync source.
- */
- bool isActive() const;
-
- /**
- * Starts fetcher so that we begin tailing the remote oplog on the sync source.
- */
- Status startup();
-
- /**
- * Cancels both scheduled and active remote command requests.
- * Returns immediately if the Oplog Fetcher is not active.
- * It is fine to call this multiple times.
- */
- void shutdown();
-
- /**
- * Waits until the oplog fetcher is inactive.
- * It is fine to call this multiple times.
- */
- void join();
-
- /**
* Returns optime and hash of the last oplog entry in the most recent oplog query result.
*/
OpTimeWithHash getLastOpTimeWithHashFetched() const;
@@ -196,22 +173,11 @@ public:
*/
Milliseconds getAwaitDataTimeout_forTest() const;
- // State transitions:
- // PreStart --> Running --> ShuttingDown --> Complete
- // It is possible to skip intermediate states. For example,
- // Calling shutdown() when the cloner has not started will transition from PreStart directly
- // to Complete.
- // This enum class is made public for testing.
- enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
-
- /**
- * Returns current oplog fetcher state.
- * For testing only.
- */
- State getState_forTest() const;
-
private:
- bool _isActive_inlock() const;
+ // AbstractAsyncComponent overrides.
+ Status _doStartup_inlock() noexcept override;
+ void _doShutdown_inlock() noexcept override;
+ stdx::mutex* _getMutex() noexcept override;
/**
* Schedules fetcher and updates counters.
@@ -239,18 +205,11 @@ private:
*/
std::unique_ptr<Fetcher> _makeFetcher(long long currentTerm, OpTime lastFetchedOpTime);
- /**
- * Returns whether the oplog fetcher is in shutdown.
- */
- bool _isShuttingDown() const;
- bool _isShuttingDown_inlock() const;
-
// Protects member data of this OplogFetcher.
mutable stdx::mutex _mutex;
mutable stdx::condition_variable _condition;
- executor::TaskExecutor* const _executor;
const HostAndPort _source;
const NamespaceString _nss;
const BSONObj _metadataObject;
@@ -277,9 +236,6 @@ private:
// "_enqueueDocumentsFn".
OpTimeWithHash _lastFetched;
- // Current oplog fetcher state. See comments for State enum class for details.
- State _state = State::kPreStart;
-
// Fetcher restarts since the last successful oplog query response.
std::size_t _fetcherRestarts = 0;
@@ -287,11 +243,5 @@ private:
std::unique_ptr<Fetcher> _shuttingDownFetcher;
};
-/**
- * Insertion operator for OplogFetcher::State. Formats oplog fetcher state for output stream.
- * For testing only.
- */
-std::ostream& operator<<(std::ostream& os, const OplogFetcher::State& state);
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 827cbd86e26..4effda18af3 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -346,7 +346,7 @@ TEST_F(OplogFetcherTest, StartupWhenActiveReturnsIllegalOperation) {
ASSERT_TRUE(oplogFetcher.isActive());
auto status = oplogFetcher.startup();
getExecutor().shutdown();
- ASSERT_EQUALS(ErrorCodes::InternalError, status);
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, status);
ASSERT_STRING_CONTAINS(status.reason(), "oplog fetcher already started");
}