summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-06 18:52:40 -0400
committerBenety Goh <benety@mongodb.com>2019-04-06 18:53:02 -0400
commit3e504b77e351c266b8324abbbe4c285e4df8db77 (patch)
treee91740746defa2cc310b8a1f8bc9df7972a0f297 /src/mongo/db/repl/sync_tail.cpp
parentb66c0d34088dae2a01a42c936396fc7a8f750201 (diff)
downloadmongo-3e504b77e351c266b8324abbbe4c285e4df8db77.tar.gz
SERVER-39950 SyncTail::OpQueueBatcher::run() obtains next batch of ops using OplogApplier
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp26
1 files changed, 17 insertions, 9 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index ea74fe806cb..e8a6912ea16 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -630,13 +630,14 @@ private:
cc().makeOperationContext().get(), _storageInterface);
while (true) {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(rsSyncApplyStop);
+
batchLimits.slaveDelayLatestTimestamp = _calculateSlaveDelayLatestTimestamp();
// Check this once per batch since users can change it at runtime.
batchLimits.ops = OplogApplier::getBatchLimitOperations();
OpQueue ops(batchLimits.ops);
- // tryPopAndWaitForMore adds to ops and returns true when we need to end a batch early.
{
auto opCtx = cc().makeOperationContext();
@@ -648,8 +649,21 @@ private:
// handling.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- while (!_syncTail->tryPopAndWaitForMore(
- opCtx.get(), _oplogBuffer, &ops, batchLimits)) {
+ auto oplogEntries =
+ fassertNoTrace(31004, _getNextApplierBatchFn(opCtx.get(), batchLimits));
+ for (const auto& oplogEntry : oplogEntries) {
+ ops.emplace_back(oplogEntry.raw);
+ }
+
+ // If we don't have anything in the queue, wait a bit for something to appear.
+ if (oplogEntries.empty()) {
+ if (_syncTail->inShutdown()) {
+ ops.setMustShutdownFlag();
+ } else {
+ // Block up to 1 second. We still return true in this case because we want
+ // this op to be the first in a new batch with a new start time.
+ _oplogBuffer->waitForData(Seconds(1));
+ }
}
}
@@ -870,12 +884,6 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
return true; // Return before wasting time parsing the op.
}
- // Don't consume the op if we are told to stop.
- if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
- sleepmillis(10);
- return true;
- }
-
ops->emplace_back(std::move(op)); // Parses the op in-place.
}