diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2016-07-19 17:15:52 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2016-07-23 14:01:04 -0400 |
commit | 739f13872ceee7b5301b3699f962cd08bf7d6eb2 (patch) | |
tree | 989ca8c4ffcd053d50470551bd068d8fb5e60299 /src/mongo/db/repl/sync_tail.cpp | |
parent | 2f6616860247e070d5f2db54f448904546e137d2 (diff) | |
download | mongo-739f13872ceee7b5301b3699f962cd08bf7d6eb2.tar.gz |
SERVER-23476: break out rs_sync runSyncThread into its own class
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 52ebf077741..17dd237ee3c 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -669,7 +669,10 @@ class SyncTail::OpQueueBatcher { MONGO_DISALLOW_COPYING(OpQueueBatcher); public: - explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {} + OpQueueBatcher(SyncTail* syncTail, stdx::function<bool()> shouldShutdown) + : _syncTail(syncTail), _thread([this, shouldShutdown] { + run([this, shouldShutdown] { return _inShutdown.load() || shouldShutdown(); }); + }) {} ~OpQueueBatcher() { _inShutdown.store(true); _cv.notify_all(); @@ -692,14 +695,14 @@ public: } private: - void run() { + void run(stdx::function<bool()> shouldShutdown) { Client::initThread("ReplBatcher"); const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); OperationContext& txn = *txnPtr; const auto replCoord = ReplicationCoordinator::get(&txn); const auto fastClockSource = txn.getServiceContext()->getFastClockSource(); - while (!_inShutdown.load()) { + while (!shouldShutdown()) { const auto batchStartTime = fastClockSource->now(); const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); @@ -744,7 +747,7 @@ private: stdx::unique_lock<stdx::mutex> lk(_mutex); while (!_ops.empty()) { // Block until the previous batch has been taken. - if (_inShutdown.load()) + if (shouldShutdown()) return; _cv.wait(lk); } @@ -764,12 +767,12 @@ private: }; /* tail an oplog. ok to return, will be re-called. */ -void SyncTail::oplogApplication() { - OpQueueBatcher batcher(this); +void SyncTail::oplogApplication(ReplicationCoordinator* replCoord, + stdx::function<bool()> shouldShutdown) { + OpQueueBatcher batcher(this, shouldShutdown); const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); OperationContext& txn = *txnPtr; - auto replCoord = ReplicationCoordinator::get(&txn); std::unique_ptr<ApplyBatchFinalizer> finalizer{ getGlobalServiceContext()->getGlobalStorageEngine()->isDurable() ? new ApplyBatchFinalizerForJournal(replCoord) @@ -778,7 +781,7 @@ void SyncTail::oplogApplication() { auto minValidBoundaries = StorageInterface::get(&txn)->getMinValid(&txn); OpTime originalEndOpTime(minValidBoundaries.end); OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()}; - while (!inShutdown()) { + while (!shouldShutdown()) { if (replCoord->getInitialSyncRequestedFlag()) { // got a resync command return; |