diff options
author | Spencer T Brody <spencer@mongodb.com> | 2015-02-02 14:27:01 -0500 |
---|---|---|
committer | Ramon Fernandez <ramon.fernandez@mongodb.com> | 2015-02-04 13:47:48 -0500 |
commit | dbc52f024a2e679038f85bc818bb9b5c7c8c9acd (patch) | |
tree | b498155097edb60a37935a180673b89384f82a36 | |
parent | 97591316b10a0c41c8449d15d23dc562401998b2 (diff) | |
download | mongo-dbc52f024a2e679038f85bc818bb9b5c7c8c9acd.tar.gz |
SERVER-17123 Wait for the BackgroundSync thread to stop before enabling writes when becoming primary
(cherry picked from commit 3e411144a9bfb4b08d98cd26a778453e04e92a0e)
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 11 |
3 files changed, 30 insertions, 8 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 9a8a32618b7..54f33578a32 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -127,9 +127,11 @@ namespace { // Clear the buffer in case the producerThread is waiting in push() due to a full queue. invariant(inShutdown()); _buffer.clear(); + _pause = true; // Wake up producerThread so it notices that we're in shutdown - _condvar.notify_all(); + _appliedBufferCondition.notify_all(); + _pausedCondition.notify_all(); } void BackgroundSync::notify(OperationContext* txn) { @@ -138,8 +140,7 @@ namespace { // If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting) if (_buffer.empty()) { _appliedBuffer = true; - _replCoord->signalDrainComplete(txn); - _condvar.notify_all(); + _appliedBufferCondition.notify_all(); } } @@ -214,7 +215,7 @@ namespace { // Wait until we've applied the ops we have before we choose a sync target while (!_appliedBuffer && !inShutdownStrict()) { - _condvar.wait(lock); + _appliedBufferCondition.wait(lock); } if (inShutdownStrict()) { return; @@ -442,13 +443,14 @@ namespace { } void BackgroundSync::stop() { - boost::unique_lock<boost::mutex> lock(_mutex); + boost::lock_guard<boost::mutex> lock(_mutex); _pause = true; _syncSourceHost = HostAndPort(); _lastOpTimeFetched = OpTime(0,0); _lastFetchedHash = 0; - _condvar.notify_all(); + _appliedBufferCondition.notify_all(); + _pausedCondition.notify_all(); } void BackgroundSync::start(OperationContext* txn) { @@ -467,6 +469,13 @@ namespace { " " << _lastFetchedHash; } + void BackgroundSync::waitUntilPaused() { + boost::unique_lock<boost::mutex> lock(_mutex); + while (!_pause) { + _pausedCondition.wait(lock); + } + } + long long BackgroundSync::getLastAppliedHash() const { boost::lock_guard<boost::mutex> lck(_mutex); return _lastAppliedHash; diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 20973fd1908..8a36e572abe 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -82,6 +82,9 @@ namespace repl { void shutdown(); void notify(OperationContext* txn); + // Blocks until _pause becomes true from a call to stop() or shutdown() + void waitUntilPaused(); + virtual ~BackgroundSync() {} // starts the producer thread @@ -138,8 +141,9 @@ namespace repl { // if produce thread should be running bool _pause; + boost::condition _pausedCondition; bool _appliedBuffer; - boost::condition _condvar; + boost::condition _appliedBufferCondition; HostAndPort _syncSourceHost; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 557fbc45ae1..9258c6e48c3 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -528,7 +528,16 @@ namespace { if (!peek_success) { // if we don't have anything in the queue, wait a bit for something to appear if (ops->empty()) { - replCoord->signalDrainComplete(txn); + if (replCoord->isWaitingForApplierToDrain()) { + BackgroundSync::get()->waitUntilPaused(); + if (peek(&op)) { + // The producer generated a last batch of ops before pausing so return + // false so that we'll come back and apply them before signaling the drain + // is complete. + return false; + } + replCoord->signalDrainComplete(txn); + } // block up to 1 second _networkQueue->waitForMore(); return false; |