summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2019-08-23 23:11:46 +0000
committerSiyuan Zhou <visualzhou@gmail.com>2019-09-05 03:35:24 -0400
commit1929f9ba94436b57364b24cfe5646464d964564e (patch)
tree875f5bd022956fbba7de786f3e310964ab44d293 /src
parent2a3748453d5d00d7f8de91ea75c985f5ad1ce14b (diff)
downloadmongo-1929f9ba94436b57364b24cfe5646464d964564e.tar.gz
SERVER-42219 Make sure oplog buffer is empty when primary exits drain mode.
(cherry picked from commit 883b10b38ddd7aa5b9a197688141ebf387292a07)
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp56
-rw-r--r--src/mongo/db/repl/sync_tail.h13
2 files changed, 60 insertions, 9 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 5f90645b8ba..6463e5954f9 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -589,7 +589,16 @@ public:
OpQueue getNextBatch(Seconds maxWaitTime) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_ops.empty() && !_ops.mustShutdown()) {
+ // _ops can indicate the following cases:
+ // 1. A new batch is ready to consume.
+ // 2. Shutdown.
+ // 3. The batch has (or had) exhausted the buffer in draining mode.
+ // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode,
+ // so there could be new oplog entries coming.
+ // 5. Empty batch since the batcher is still running.
+ //
+ // In case (4) and (5), we wait for up to "maxWaitTime".
+ if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) {
// We intentionally don't care about whether this returns due to signaling or timeout
// since we do the same thing either way: return whatever is in _ops.
(void)_cv.wait_for(lk, maxWaitTime.toSystemDuration());
@@ -598,7 +607,6 @@ public:
OpQueue ops = std::move(_ops);
_ops = OpQueue(0);
_cv.notify_all();
-
return ops;
}
@@ -656,20 +664,45 @@ private:
if (_syncTail->inShutdown()) {
ops.setMustShutdownFlag();
} else {
- // Block up to 1 second. We still return true in this case because we want
- // this op to be the first in a new batch with a new start time.
+ // Block up to 1 second.
_oplogBuffer->waitForData(Seconds(1));
}
}
}
if (ops.empty() && !ops.mustShutdown()) {
- continue; // Don't emit empty batches.
+ // Check whether we have drained the oplog buffer. The states checked here can be
+ // stale when it's used by the applier. signalDrainComplete() needs to check the
+ // applier is still draining in the same term to make sure these states have not
+ // changed.
+ auto replCoord = ReplicationCoordinator::get(cc().getServiceContext());
+ // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling
+ // drain complete.
+ //
+ // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer
+ // and blocks when it's about to notify the applier to signal drain complete, the
+ // node may step down and fetch new data into the buffer and then step up again.
+ // Now the batcher will resume and let the applier signal drain complete even if
+ // the buffer has new data. Checking the term before and after ensures nothing
+ // changed in between.
+ auto termWhenBufferIsEmpty = replCoord->getTerm();
+ // Draining state guarantees the producer has already been fully stopped and no more
+ // operations will be pushed in to the oplog buffer until the applier state changes.
+ auto isDraining =
+ replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining;
+ // Check the oplog buffer after the applier state to ensure the producer is stopped.
+ if (isDraining && _oplogBuffer->isEmpty()) {
+ ops.setTermWhenExhausted(termWhenBufferIsEmpty);
+ log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty;
+ } else {
+ // Don't emit empty batches.
+ continue;
+ }
}
stdx::unique_lock<stdx::mutex> lk(_mutex);
// Block until the previous batch has been taken.
- _cv.wait(lk, [&] { return _ops.empty(); });
+ _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); });
_ops = std::move(ops);
_cv.notify_all();
if (_ops.mustShutdown()) {
@@ -749,7 +782,6 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
// Transition to SECONDARY state, if possible.
tryToGoLiveAsASecondary(&opCtx, replCoord, minValid);
- long long termWhenBufferIsEmpty = replCoord->getTerm();
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
// ready in time, we'll loop again so we can do the above checks periodically.
OpQueue ops = batcher->getNextBatch(Seconds(1));
@@ -761,8 +793,14 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
continue;
}
- // Signal drain complete if we're in Draining state and the buffer is empty.
- replCoord->signalDrainComplete(&opCtx, termWhenBufferIsEmpty);
+ if (ops.termWhenExhausted()) {
+ // Signal drain complete if we're in Draining state and the buffer is empty.
+ // Since we check the states of batcher and oplog buffer without synchronization,
+ // they can be stale. We make sure the applier is still draining in the given term
+ // before and after the check, so that if the oplog buffer was exhausted, then
+ // it still will be.
+ replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted());
+ }
continue; // Try again.
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index ac22d3366f6..db036ac771a 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -178,6 +178,18 @@ public:
}
/**
+ * If the oplog buffer is exhausted, return the term before we learned that the buffer was
+ * empty.
+ */
+ boost::optional<long long> termWhenExhausted() const {
+ return _termWhenExhausted;
+ }
+ void setTermWhenExhausted(long long term) {
+ invariant(empty());
+ _termWhenExhausted = term;
+ }
+
+ /**
* Leaves this object in an unspecified state. Only assignment and destruction are valid.
*/
std::vector<OplogEntry> releaseBatch() {
@@ -188,6 +200,7 @@ public:
std::vector<OplogEntry> _batch;
size_t _bytes;
bool _mustShutdown = false;
+ boost::optional<long long> _termWhenExhausted;
};
using BatchLimits = OplogApplier::BatchLimits;