diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2019-08-23 23:11:46 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-26 03:47:44 +0000 |
commit | 615e3c3946c57956069689eaadf1a77058b881f5 (patch) | |
tree | bbad845e3ee811b3923593fe2bef3126683a4229 | |
parent | 79c200fc57255e31f4239983830cbdcb30ed357f (diff) | |
download | mongo-615e3c3946c57956069689eaadf1a77058b881f5.tar.gz |
SERVER-42219 Make sure oplog buffer is empty when primary exits drain mode.
(cherry picked from commit 883b10b38ddd7aa5b9a197688141ebf387292a07)
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 13 |
2 files changed, 60 insertions, 9 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 10bf558adf5..7a9e4b46d57 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -839,7 +839,16 @@ public: OpQueue getNextBatch(Seconds maxWaitTime) { stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_ops.empty() && !_ops.mustShutdown()) { + // _ops can indicate the following cases: + // 1. A new batch is ready to consume. + // 2. Shutdown. + // 3. The batch has (or had) exhausted the buffer in draining mode. + // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode, + // so there could be new oplog entries coming. + // 5. Empty batch since the batcher is still running. + // + // In case (4) and (5), we wait for up to "maxWaitTime". + if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) { // We intentionally don't care about whether this returns due to signaling or timeout // since we do the same thing either way: return whatever is in _ops. (void)_cv.wait_for(lk, maxWaitTime.toSystemDuration()); @@ -848,7 +857,6 @@ public: OpQueue ops = std::move(_ops); _ops = OpQueue(0); _cv.notify_all(); - return ops; } @@ -900,12 +908,38 @@ private: } if (ops.empty() && !ops.mustShutdown()) { - continue; // Don't emit empty batches. + // Check whether we have drained the oplog buffer. The states checked here can be + // stale when it's used by the applier. signalDrainComplete() needs to check the + // applier is still draining in the same term to make sure these states have not + // changed. + auto replCoord = ReplicationCoordinator::get(cc().getServiceContext()); + // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling + // drain complete. + // + // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer + // and blocks when it's about to notify the applier to signal drain complete, the + // node may step down and fetch new data into the buffer and then step up again. + // Now the batcher will resume and let the applier signal drain complete even if + // the buffer has new data. Checking the term before and after ensures nothing + // changed in between. + auto termWhenBufferIsEmpty = replCoord->getTerm(); + // Draining state guarantees the producer has already been fully stopped and no more + // operations will be pushed in to the oplog buffer until the applier state changes. + auto isDraining = + replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining; + // Check the oplog buffer after the applier state to ensure the producer is stopped. + if (isDraining && _oplogBuffer->isEmpty()) { + ops.setTermWhenExhausted(termWhenBufferIsEmpty); + log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty; + } else { + // Don't emit empty batches. + continue; + } } stdx::unique_lock<stdx::mutex> lk(_mutex); // Block until the previous batch has been taken. - _cv.wait(lk, [&] { return _ops.empty(); }); + _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); }); _ops = std::move(ops); _cv.notify_all(); if (_ops.mustShutdown()) { @@ -978,7 +1012,6 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer, // Transition to SECONDARY state, if possible. tryToGoLiveAsASecondary(&opCtx, replCoord, minValid); - long long termWhenBufferIsEmpty = replCoord->getTerm(); // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become // ready in time, we'll loop again so we can do the above checks periodically. OpQueue ops = batcher->getNextBatch(Seconds(1)); @@ -990,8 +1023,14 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer, if (MONGO_FAIL_POINT(rsSyncApplyStop)) { continue; } - // Signal drain complete if we're in Draining state and the buffer is empty. - replCoord->signalDrainComplete(&opCtx, termWhenBufferIsEmpty); + if (ops.termWhenExhausted()) { + // Signal drain complete if we're in Draining state and the buffer is empty. + // Since we check the states of batcher and oplog buffer without synchronization, + // they can be stale. We make sure the applier is still draining in the given term + // before and after the check, so that if the oplog buffer was exhausted, then + // it still will be. + replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted()); + } continue; // Try again. } @@ -1079,8 +1118,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, if (inShutdown()) { ops->setMustShutdownFlag(); } else { - // Block up to 1 second. We still return true in this case because we want this - // op to be the first in a new batch with a new start time. + // Block up to 1 second. oplogBuffer->waitForData(Seconds(1)); } } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 914252690df..6e7da616c6b 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -204,6 +204,18 @@ public: } /** + * If the oplog buffer is exhausted, return the term before we learned that the buffer was + * empty. + */ + boost::optional<long long> termWhenExhausted() const { + return _termWhenExhausted; + } + void setTermWhenExhausted(long long term) { + invariant(empty()); + _termWhenExhausted = term; + } + + /** * Leaves this object in an unspecified state. Only assignment and destruction are valid. */ std::vector<OplogEntry> releaseBatch() { @@ -214,6 +226,7 @@ public: std::vector<OplogEntry> _batch; size_t _bytes; bool _mustShutdown = false; + boost::optional<long long> _termWhenExhausted; }; using BatchLimits = OplogApplier::BatchLimits; |