diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2019-08-23 23:11:46 +0000 |
---|---|---|
committer | Siyuan Zhou <visualzhou@gmail.com> | 2019-09-05 03:35:24 -0400 |
commit | 1929f9ba94436b57364b24cfe5646464d964564e (patch) | |
tree | 875f5bd022956fbba7de786f3e310964ab44d293 /src | |
parent | 2a3748453d5d00d7f8de91ea75c985f5ad1ce14b (diff) | |
download | mongo-1929f9ba94436b57364b24cfe5646464d964564e.tar.gz |
SERVER-42219 Make sure oplog buffer is empty when primary exits drain mode.
(cherry picked from commit 883b10b38ddd7aa5b9a197688141ebf387292a07)
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 13 |
2 files changed, 60 insertions, 9 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 5f90645b8ba..6463e5954f9 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -589,7 +589,16 @@ public: OpQueue getNextBatch(Seconds maxWaitTime) { stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_ops.empty() && !_ops.mustShutdown()) { + // _ops can indicate the following cases: + // 1. A new batch is ready to consume. + // 2. Shutdown. + // 3. The batch has (or had) exhausted the buffer in draining mode. + // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode, + // so there could be new oplog entries coming. + // 5. Empty batch since the batcher is still running. + // + // In case (4) and (5), we wait for up to "maxWaitTime". + if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) { // We intentionally don't care about whether this returns due to signaling or timeout // since we do the same thing either way: return whatever is in _ops. (void)_cv.wait_for(lk, maxWaitTime.toSystemDuration()); @@ -598,7 +607,6 @@ public: OpQueue ops = std::move(_ops); _ops = OpQueue(0); _cv.notify_all(); - return ops; } @@ -656,20 +664,45 @@ private: if (_syncTail->inShutdown()) { ops.setMustShutdownFlag(); } else { - // Block up to 1 second. We still return true in this case because we want - // this op to be the first in a new batch with a new start time. + // Block up to 1 second. _oplogBuffer->waitForData(Seconds(1)); } } } if (ops.empty() && !ops.mustShutdown()) { - continue; // Don't emit empty batches. + // Check whether we have drained the oplog buffer. The states checked here can be + // stale when it's used by the applier. signalDrainComplete() needs to check the + // applier is still draining in the same term to make sure these states have not + // changed. + auto replCoord = ReplicationCoordinator::get(cc().getServiceContext()); + // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling + // drain complete. + // + // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer + // and blocks when it's about to notify the applier to signal drain complete, the + // node may step down and fetch new data into the buffer and then step up again. + // Now the batcher will resume and let the applier signal drain complete even if + // the buffer has new data. Checking the term before and after ensures nothing + // changed in between. + auto termWhenBufferIsEmpty = replCoord->getTerm(); + // Draining state guarantees the producer has already been fully stopped and no more + // operations will be pushed in to the oplog buffer until the applier state changes. + auto isDraining = + replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining; + // Check the oplog buffer after the applier state to ensure the producer is stopped. + if (isDraining && _oplogBuffer->isEmpty()) { + ops.setTermWhenExhausted(termWhenBufferIsEmpty); + log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty; + } else { + // Don't emit empty batches. + continue; + } } stdx::unique_lock<stdx::mutex> lk(_mutex); // Block until the previous batch has been taken. - _cv.wait(lk, [&] { return _ops.empty(); }); + _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); }); _ops = std::move(ops); _cv.notify_all(); if (_ops.mustShutdown()) { @@ -749,7 +782,6 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord, // Transition to SECONDARY state, if possible. tryToGoLiveAsASecondary(&opCtx, replCoord, minValid); - long long termWhenBufferIsEmpty = replCoord->getTerm(); // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become // ready in time, we'll loop again so we can do the above checks periodically. OpQueue ops = batcher->getNextBatch(Seconds(1)); @@ -761,8 +793,14 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord, if (MONGO_FAIL_POINT(rsSyncApplyStop)) { continue; } - // Signal drain complete if we're in Draining state and the buffer is empty. - replCoord->signalDrainComplete(&opCtx, termWhenBufferIsEmpty); + if (ops.termWhenExhausted()) { + // Signal drain complete if we're in Draining state and the buffer is empty. + // Since we check the states of batcher and oplog buffer without synchronization, + // they can be stale. We make sure the applier is still draining in the given term + // before and after the check, so that if the oplog buffer was exhausted, then + // it still will be. + replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted()); + } continue; // Try again. } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index ac22d3366f6..db036ac771a 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -178,6 +178,18 @@ public: } /** + * If the oplog buffer is exhausted, return the term before we learned that the buffer was + * empty. + */ + boost::optional<long long> termWhenExhausted() const { + return _termWhenExhausted; + } + void setTermWhenExhausted(long long term) { + invariant(empty()); + _termWhenExhausted = term; + } + + /** * Leaves this object in an unspecified state. Only assignment and destruction are valid. */ std::vector<OplogEntry> releaseBatch() { @@ -188,6 +200,7 @@ public: std::vector<OplogEntry> _batch; size_t _bytes; bool _mustShutdown = false; + boost::optional<long long> _termWhenExhausted; }; using BatchLimits = OplogApplier::BatchLimits; |