summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2016-07-19 17:15:52 -0400
committerScott Hernandez <scotthernandez@gmail.com>2016-07-23 14:01:04 -0400
commit739f13872ceee7b5301b3699f962cd08bf7d6eb2 (patch)
tree989ca8c4ffcd053d50470551bd068d8fb5e60299 /src/mongo/db/repl/sync_tail.cpp
parent2f6616860247e070d5f2db54f448904546e137d2 (diff)
downloadmongo-739f13872ceee7b5301b3699f962cd08bf7d6eb2.tar.gz
SERVER-23476: break out rs_sync runSyncThread into its own class
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp19
1 files changed, 11 insertions, 8 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 52ebf077741..17dd237ee3c 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -669,7 +669,10 @@ class SyncTail::OpQueueBatcher {
MONGO_DISALLOW_COPYING(OpQueueBatcher);
public:
- explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {}
+ OpQueueBatcher(SyncTail* syncTail, stdx::function<bool()> shouldShutdown)
+ : _syncTail(syncTail), _thread([this, shouldShutdown] {
+ run([this, shouldShutdown] { return _inShutdown.load() || shouldShutdown(); });
+ }) {}
~OpQueueBatcher() {
_inShutdown.store(true);
_cv.notify_all();
@@ -692,14 +695,14 @@ public:
}
private:
- void run() {
+ void run(stdx::function<bool()> shouldShutdown) {
Client::initThread("ReplBatcher");
const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
OperationContext& txn = *txnPtr;
const auto replCoord = ReplicationCoordinator::get(&txn);
const auto fastClockSource = txn.getServiceContext()->getFastClockSource();
- while (!_inShutdown.load()) {
+ while (!shouldShutdown()) {
const auto batchStartTime = fastClockSource->now();
const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
@@ -744,7 +747,7 @@ private:
stdx::unique_lock<stdx::mutex> lk(_mutex);
while (!_ops.empty()) {
// Block until the previous batch has been taken.
- if (_inShutdown.load())
+ if (shouldShutdown())
return;
_cv.wait(lk);
}
@@ -764,12 +767,12 @@ private:
};
/* tail an oplog. ok to return, will be re-called. */
-void SyncTail::oplogApplication() {
- OpQueueBatcher batcher(this);
+void SyncTail::oplogApplication(ReplicationCoordinator* replCoord,
+ stdx::function<bool()> shouldShutdown) {
+ OpQueueBatcher batcher(this, shouldShutdown);
const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
OperationContext& txn = *txnPtr;
- auto replCoord = ReplicationCoordinator::get(&txn);
std::unique_ptr<ApplyBatchFinalizer> finalizer{
getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()
? new ApplyBatchFinalizerForJournal(replCoord)
@@ -778,7 +781,7 @@ void SyncTail::oplogApplication() {
auto minValidBoundaries = StorageInterface::get(&txn)->getMinValid(&txn);
OpTime originalEndOpTime(minValidBoundaries.end);
OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()};
- while (!inShutdown()) {
+ while (!shouldShutdown()) {
if (replCoord->getInitialSyncRequestedFlag()) {
// got a resync command
return;