summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2019-08-23 23:11:46 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-26 03:47:44 +0000
commit615e3c3946c57956069689eaadf1a77058b881f5 (patch)
treebbad845e3ee811b3923593fe2bef3126683a4229
parent79c200fc57255e31f4239983830cbdcb30ed357f (diff)
downloadmongo-615e3c3946c57956069689eaadf1a77058b881f5.tar.gz
SERVER-42219 Make sure oplog buffer is empty when primary exits drain mode.
(cherry picked from commit 883b10b38ddd7aa5b9a197688141ebf387292a07)
-rw-r--r--src/mongo/db/repl/sync_tail.cpp56
-rw-r--r--src/mongo/db/repl/sync_tail.h13
2 files changed, 60 insertions, 9 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 10bf558adf5..7a9e4b46d57 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -839,7 +839,16 @@ public:
OpQueue getNextBatch(Seconds maxWaitTime) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_ops.empty() && !_ops.mustShutdown()) {
+ // _ops can indicate the following cases:
+ // 1. A new batch is ready to consume.
+ // 2. Shutdown.
+ // 3. The batch has (or had) exhausted the buffer in draining mode.
+ // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode,
+ // so there could be new oplog entries coming.
+ // 5. Empty batch since the batcher is still running.
+ //
+ // In case (4) and (5), we wait for up to "maxWaitTime".
+ if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) {
// We intentionally don't care about whether this returns due to signaling or timeout
// since we do the same thing either way: return whatever is in _ops.
(void)_cv.wait_for(lk, maxWaitTime.toSystemDuration());
@@ -848,7 +857,6 @@ public:
OpQueue ops = std::move(_ops);
_ops = OpQueue(0);
_cv.notify_all();
-
return ops;
}
@@ -900,12 +908,38 @@ private:
}
if (ops.empty() && !ops.mustShutdown()) {
- continue; // Don't emit empty batches.
+ // Check whether we have drained the oplog buffer. The states checked here can be
+ // stale when it's used by the applier. signalDrainComplete() needs to check the
+ // applier is still draining in the same term to make sure these states have not
+ // changed.
+ auto replCoord = ReplicationCoordinator::get(cc().getServiceContext());
+ // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling
+ // drain complete.
+ //
+ // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer
+ // and blocks when it's about to notify the applier to signal drain complete, the
+ // node may step down and fetch new data into the buffer and then step up again.
+ // Now the batcher will resume and let the applier signal drain complete even if
+ // the buffer has new data. Checking the term before and after ensures nothing
+ // changed in between.
+ auto termWhenBufferIsEmpty = replCoord->getTerm();
+ // Draining state guarantees the producer has already been fully stopped and no more
+ // operations will be pushed in to the oplog buffer until the applier state changes.
+ auto isDraining =
+ replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining;
+ // Check the oplog buffer after the applier state to ensure the producer is stopped.
+ if (isDraining && _oplogBuffer->isEmpty()) {
+ ops.setTermWhenExhausted(termWhenBufferIsEmpty);
+ log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty;
+ } else {
+ // Don't emit empty batches.
+ continue;
+ }
}
stdx::unique_lock<stdx::mutex> lk(_mutex);
// Block until the previous batch has been taken.
- _cv.wait(lk, [&] { return _ops.empty(); });
+ _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); });
_ops = std::move(ops);
_cv.notify_all();
if (_ops.mustShutdown()) {
@@ -978,7 +1012,6 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer,
// Transition to SECONDARY state, if possible.
tryToGoLiveAsASecondary(&opCtx, replCoord, minValid);
- long long termWhenBufferIsEmpty = replCoord->getTerm();
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
// ready in time, we'll loop again so we can do the above checks periodically.
OpQueue ops = batcher->getNextBatch(Seconds(1));
@@ -990,8 +1023,14 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer,
if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
continue;
}
- // Signal drain complete if we're in Draining state and the buffer is empty.
- replCoord->signalDrainComplete(&opCtx, termWhenBufferIsEmpty);
+ if (ops.termWhenExhausted()) {
+ // Signal drain complete if we're in Draining state and the buffer is empty.
+ // Since we check the states of batcher and oplog buffer without synchronization,
+ // they can be stale. We make sure the applier is still draining in the given term
+ // before and after the check, so that if the oplog buffer was exhausted, then
+ // it still will be.
+ replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted());
+ }
continue; // Try again.
}
@@ -1079,8 +1118,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
if (inShutdown()) {
ops->setMustShutdownFlag();
} else {
- // Block up to 1 second. We still return true in this case because we want this
- // op to be the first in a new batch with a new start time.
+ // Block up to 1 second.
oplogBuffer->waitForData(Seconds(1));
}
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 914252690df..6e7da616c6b 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -204,6 +204,18 @@ public:
}
/**
+ * If the oplog buffer is exhausted, return the term before we learned that the buffer was
+ * empty.
+ */
+ boost::optional<long long> termWhenExhausted() const {
+ return _termWhenExhausted;
+ }
+ void setTermWhenExhausted(long long term) {
+ invariant(empty());
+ _termWhenExhausted = term;
+ }
+
+ /**
* Leaves this object in an unspecified state. Only assignment and destruction are valid.
*/
std::vector<OplogEntry> releaseBatch() {
@@ -214,6 +226,7 @@ public:
std::vector<OplogEntry> _batch;
size_t _bytes;
bool _mustShutdown = false;
+ boost::optional<long long> _termWhenExhausted;
};
using BatchLimits = OplogApplier::BatchLimits;