diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2016-07-19 17:15:52 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2016-07-23 14:01:04 -0400 |
commit | 739f13872ceee7b5301b3699f962cd08bf7d6eb2 (patch) | |
tree | 989ca8c4ffcd053d50470551bd068d8fb5e60299 /src/mongo | |
parent | 2f6616860247e070d5f2db54f448904546e137d2 (diff) | |
download | mongo-739f13872ceee7b5301b3699f962cd08bf7d6eb2.tar.gz |
SERVER-23476: break out rs_sync runSyncThread into its own class
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.h | 44 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 2 |
6 files changed, 104 insertions, 33 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 640e81a39af..d6c49eeedf6 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -182,7 +182,8 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(Operat log() << "Starting replication applier threads"; invariant(!_applierThread); - _applierThread.reset(new stdx::thread(stdx::bind(&runSyncThread, _bgSync.get()))); + _applierThread.reset(new RSDataSync{_bgSync.get(), ReplicationCoordinator::get(txn)}); + _applierThread->startup(); log() << "Starting replication reporter thread"; invariant(!_syncSourceFeedbackThread); _syncSourceFeedbackThread.reset(new stdx::thread(stdx::bind( @@ -224,7 +225,9 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) { _syncSourceFeedbackThread->join(); } if (_applierThread) { + _applierThread->shutdown(); _applierThread->join(); + _applierThread.reset(); } if (_bgSync) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 60349f49a3c..cfff6225fc4 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -34,6 +34,7 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/replication_coordinator_external_state.h" +#include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/sync_source_feedback.h" #include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/snapshot_manager.h" @@ -136,7 +137,7 @@ private: std::unique_ptr<stdx::thread> _syncSourceFeedbackThread; // Thread running runSyncThread(). - std::unique_ptr<stdx::thread> _applierThread; + std::unique_ptr<RSDataSync> _applierThread; // Mutex guarding the _nextThreadId value to prevent concurrent incrementing. stdx::mutex _nextThreadIdMutex; diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 1fe21e6d659..066bdbc0498 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -55,32 +55,69 @@ #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace repl { -void runSyncThread(BackgroundSync* bgsync) { +namespace { +using LockGuard = stdx::lock_guard<stdx::mutex>; +using UniqueLock = stdx::unique_lock<stdx::mutex>; + +} // namespace + +RSDataSync::RSDataSync(BackgroundSync* bgsync, ReplicationCoordinator* replCoord) + : _bgsync(bgsync), _replCoord(replCoord) {} + +void RSDataSync::startup() { + LockGuard lk(_mutex); + _runThread = stdx::make_unique<stdx::thread>(&RSDataSync::_run, this); + _stopped = false; + _inShutdown = _stopped; +} + +void RSDataSync::shutdown() { + LockGuard lk(_mutex); + _inShutdown = true; +} + +bool RSDataSync::_isInShutdown() const { + LockGuard lk(_mutex); + return _inShutdown; +} + +void RSDataSync::join() { + UniqueLock lk(_mutex); + if (_stopped) { + return; + } + if (_runThread) { + lk.unlock(); + _runThread->join(); + } +} + +void RSDataSync::_run() { Client::initThread("rsSync"); AuthorizationSession::get(cc())->grantInternalAuthorization(); - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); // Overwrite prefetch index mode in BackgroundSync if ReplSettings has a mode set. - ReplSettings replSettings = replCoord->getSettings(); + auto&& replSettings = _replCoord->getSettings(); if (replSettings.isPrefetchIndexModeSet()) - replCoord->setIndexPrefetchConfig(replSettings.getPrefetchIndexMode()); + _replCoord->setIndexPrefetchConfig(replSettings.getPrefetchIndexMode()); - while (!inShutdown()) { + while (!_isInShutdown()) { // After a reconfig, we may not be in the replica set anymore, so // check that we are in the set (and not an arbiter) before // trying to sync with other replicas. // TODO(spencer): Use a condition variable to await loading a config - if (replCoord->getMemberState().startup()) { + if (_replCoord->getMemberState().startup()) { warning() << "did not receive a valid config yet"; sleepsecs(1); continue; } - const MemberState memberState = replCoord->getMemberState(); + const MemberState memberState = _replCoord->getMemberState(); // An arbiter can never transition to any other state, and doesn't replicate, ever if (memberState.arbiter()) { @@ -94,22 +131,25 @@ void runSyncThread(BackgroundSync* bgsync) { } try { - if (memberState.primary() && !replCoord->isWaitingForApplierToDrain()) { + if (memberState.primary() && !_replCoord->isWaitingForApplierToDrain()) { sleepsecs(1); continue; } - if (!replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { + if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { continue; } - /* we have some data. continue tailing. */ - SyncTail tail(bgsync, multiSyncApply); - tail.oplogApplication(); + SyncTail tail(_bgsync, multiSyncApply); + tail.oplogApplication(_replCoord, [this]() { return _isInShutdown(); }); } catch (...) { std::terminate(); } } + + LockGuard lk(_mutex); + _inShutdown = false; + _stopped = true; } } // namespace repl diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h index 047fdc3d9a6..55fa6182576 100644 --- a/src/mongo/db/repl/rs_sync.h +++ b/src/mongo/db/repl/rs_sync.h @@ -28,22 +28,46 @@ #pragma once -#include <deque> -#include <vector> +#include <memory> + +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" -#include "mongo/db/client.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/repl/initial_sync.h" -#include "mongo/db/repl/sync_tail.h" -#include "mongo/db/storage/mmap_v1/dur.h" -#include "mongo/util/concurrency/old_thread_pool.h" namespace mongo { namespace repl { class BackgroundSync; +class ReplicationCoordinator; + +/** + * This class is used to apply + **/ +class RSDataSync { +public: + RSDataSync(BackgroundSync* bgsync, ReplicationCoordinator* replCoord); + void startup(); + void shutdown(); + void join(); + +private: + // Runs in a loop apply oplog entries from the buffer until this class cancels, or an error. + void _run(); + bool _isInShutdown() const; -// Body of the thread that will do the background sync. -void runSyncThread(BackgroundSync* bgsync); + // _mutex protects all of the class variables declared below. + mutable stdx::mutex _mutex; + // Thread doing the work. + std::unique_ptr<stdx::thread> _runThread; + // Set to true if shutdown() has been called. + bool _inShutdown = false; + // If the thread should not be running. + bool _stopped = true; + // BackgroundSync instance that is paired to this instance. + BackgroundSync* _bgsync; + // ReplicationCordinator instance. + ReplicationCoordinator* _replCoord; +}; } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 52ebf077741..17dd237ee3c 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -669,7 +669,10 @@ class SyncTail::OpQueueBatcher { MONGO_DISALLOW_COPYING(OpQueueBatcher); public: - explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {} + OpQueueBatcher(SyncTail* syncTail, stdx::function<bool()> shouldShutdown) + : _syncTail(syncTail), _thread([this, shouldShutdown] { + run([this, shouldShutdown] { return _inShutdown.load() || shouldShutdown(); }); + }) {} ~OpQueueBatcher() { _inShutdown.store(true); _cv.notify_all(); @@ -692,14 +695,14 @@ public: } private: - void run() { + void run(stdx::function<bool()> shouldShutdown) { Client::initThread("ReplBatcher"); const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); OperationContext& txn = *txnPtr; const auto replCoord = ReplicationCoordinator::get(&txn); const auto fastClockSource = txn.getServiceContext()->getFastClockSource(); - while (!_inShutdown.load()) { + while (!shouldShutdown()) { const auto batchStartTime = fastClockSource->now(); const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); @@ -744,7 +747,7 @@ private: stdx::unique_lock<stdx::mutex> lk(_mutex); while (!_ops.empty()) { // Block until the previous batch has been taken. - if (_inShutdown.load()) + if (shouldShutdown()) return; _cv.wait(lk); } @@ -764,12 +767,12 @@ private: }; /* tail an oplog. ok to return, will be re-called. */ -void SyncTail::oplogApplication() { - OpQueueBatcher batcher(this); +void SyncTail::oplogApplication(ReplicationCoordinator* replCoord, + stdx::function<bool()> shouldShutdown) { + OpQueueBatcher batcher(this, shouldShutdown); const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); OperationContext& txn = *txnPtr; - auto replCoord = ReplicationCoordinator::get(&txn); std::unique_ptr<ApplyBatchFinalizer> finalizer{ getGlobalServiceContext()->getGlobalStorageEngine()->isDurable() ? new ApplyBatchFinalizerForJournal(replCoord) @@ -778,7 +781,7 @@ void SyncTail::oplogApplication() { auto minValidBoundaries = StorageInterface::get(&txn)->getMinValid(&txn); OpTime originalEndOpTime(minValidBoundaries.end); OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()}; - while (!inShutdown()) { + while (!shouldShutdown()) { if (replCoord->getInitialSyncRequestedFlag()) { // got a resync command return; diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index c6cd128be95..e9fe7067029 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -106,7 +106,7 @@ public: static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert); - void oplogApplication(); + void oplogApplication(ReplicationCoordinator* replCoord, stdx::function<bool()> shouldShutdown); bool peek(OperationContext* txn, BSONObj* obj); class OpQueue { |