summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2022-04-22 16:29:40 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-22 15:24:30 +0000
commit81a4764d6637fe6a645cf54bdba6984b7d466ea9 (patch)
tree2552495eb566609b0cf1ac01e1ee97698aa58340
parentf60e7b0978053aa7908d7e7544e1bb14bd7f9b22 (diff)
downloadmongo-81a4764d6637fe6a645cf54bdba6984b7d466ea9.tar.gz
SERVER-65723 Add parameter oplogBatchDelayMillis to improve batching on caught-up secondaries
(cherry picked from commit 97dc4971f5894c070a029dd8d05b857062b5ddde)
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp4
-rw-r--r--src/mongo/db/repl/oplog_applier.h6
-rw-r--r--src/mongo/db/repl/oplog_applier_test.cpp234
-rw-r--r--src/mongo/db/repl/oplog_batcher.cpp33
-rw-r--r--src/mongo/db/repl/oplog_batcher.h11
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl11
6 files changed, 250 insertions, 49 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 51d3134d565..e0fe3fe3c35 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -128,8 +128,8 @@ StatusWith<OpTime> OplogApplier::applyOplogBatch(OperationContext* opCtx,
}
StatusWith<std::vector<OplogEntry>> OplogApplier::getNextApplierBatch(
- OperationContext* opCtx, const BatchLimits& batchLimits) {
- return _oplogBatcher->getNextApplierBatch(opCtx, batchLimits);
+ OperationContext* opCtx, const BatchLimits& batchLimits, Milliseconds waitToFillBatch) {
+ return _oplogBatcher->getNextApplierBatch(opCtx, batchLimits, waitToFillBatch);
}
const OplogApplier::Options& OplogApplier::getOptions() const {
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index e35c6585863..b582b282efd 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -159,8 +159,10 @@ public:
/**
* Calls the OplogBatcher's getNextApplierBatch.
*/
- StatusWith<std::vector<OplogEntry>> getNextApplierBatch(OperationContext* opCtx,
- const BatchLimits& batchLimits);
+ StatusWith<std::vector<OplogEntry>> getNextApplierBatch(
+ OperationContext* opCtx,
+ const BatchLimits& batchLimits,
+ Milliseconds waitToFillBatch = Milliseconds(0));
const Options& getOptions() const;
diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp
index 5fc5420cf17..cb35f041caa 100644
--- a/src/mongo/db/repl/oplog_applier_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_test.cpp
@@ -38,7 +38,9 @@
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/oplog_batcher_test_fixture.h"
#include "mongo/db/repl/oplog_buffer_blocking_queue.h"
+#include "mongo/db/service_context_test_fixture.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
namespace mongo {
namespace repl {
@@ -74,13 +76,17 @@ StatusWith<OpTime> OplogApplierMock::_applyOplogBatch(OperationContext* opCtx,
class OplogApplierTest : public unittest::Test {
public:
- void setUp() final;
- void tearDown() final;
+ void setUp() override;
+ void tearDown() override;
+ virtual OperationContext* opCtx() {
+ return _opCtxNoop.get();
+ }
+
protected:
std::unique_ptr<OplogBuffer> _buffer;
std::unique_ptr<OplogApplier> _applier;
- std::unique_ptr<OperationContext> _opCtx;
+ std::unique_ptr<OperationContext> _opCtxNoop;
OplogApplier::BatchLimits _limits;
};
@@ -90,7 +96,7 @@ void OplogApplierTest::setUp() {
// The OplogApplier interface expects an OperationContext* but the mock implementations in this
// test will not be dereferencing the pointer. Therefore, it is sufficient to use an
// OperationContextNoop.
- _opCtx = std::make_unique<OperationContextNoop>();
+ _opCtxNoop = std::make_unique<OperationContextNoop>();
_limits.bytes = std::numeric_limits<decltype(_limits.bytes)>::max();
_limits.ops = std::numeric_limits<decltype(_limits.ops)>::max();
@@ -98,7 +104,7 @@ void OplogApplierTest::setUp() {
void OplogApplierTest::tearDown() {
_limits = {};
- _opCtx = {};
+ _opCtxNoop = {};
_applier = {};
_buffer = {};
}
@@ -109,9 +115,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchGroupsCrudOps) {
std::vector<OplogEntry> srcOps;
srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo")));
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(srcOps.size(), batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
ASSERT_EQUALS(srcOps[1], batch[1]);
@@ -121,9 +127,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsPreparedApplyOpsOpInOwnBatch)
std::vector<OplogEntry> srcOps;
srcOps.push_back(makeApplyOpsOplogEntry(1, true));
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
}
@@ -132,9 +138,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchGroupsUnpreparedApplyOpsOpWithOtherO
std::vector<OplogEntry> srcOps;
srcOps.push_back(makeApplyOpsOplogEntry(1, false));
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(2U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
ASSERT_EQUALS(srcOps[1], batch[1]);
@@ -145,9 +151,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsSystemDotViewsOpInOwnBatch) {
srcOps.push_back(makeInsertOplogEntry(
1, NamespaceString(dbName, NamespaceString::kSystemDotViewsCollectionName)));
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
}
@@ -156,9 +162,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsServerConfigurationOpInOwnBat
std::vector<OplogEntry> srcOps;
srcOps.push_back(makeInsertOplogEntry(1, NamespaceString::kServerConfigurationNamespace));
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
}
@@ -167,9 +173,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsConfigReshardingDonorOpInOwnB
std::vector<OplogEntry> srcOps;
srcOps.push_back(makeInsertOplogEntry(1, NamespaceString::kDonorReshardingOperationsNamespace));
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
}
@@ -178,9 +184,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsPreparedCommitTransactionOpIn
std::vector<OplogEntry> srcOps;
srcOps.push_back(makeCommitTransactionOplogEntry(1, dbName, true, 3));
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
}
@@ -189,9 +195,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchGroupsUnpreparedCommitTransactionOpW
std::vector<OplogEntry> srcOps;
srcOps.push_back(makeCommitTransactionOplogEntry(1, dbName, false, 3));
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(2U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
ASSERT_EQUALS(srcOps[1], batch[1]);
@@ -204,20 +210,20 @@ TEST_F(OplogApplierTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOperatio
srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")));
srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar")));
srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
// Set batch limits so that each batch contains a maximum of 'BatchLimit::ops'.
_limits.ops = 3U;
// First batch: [insert, insert, insert]
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(3U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
ASSERT_EQUALS(srcOps[1], batch[1]);
ASSERT_EQUALS(srcOps[2], batch[2]);
// Second batch: [insert, insert]
- batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(2U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[3], batch[0]);
ASSERT_EQUALS(srcOps[4], batch[1]);
@@ -228,19 +234,19 @@ TEST_F(OplogApplierTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOperations
srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")));
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
// Set batch limits so that only the first two operations can fit into the first batch.
_limits.bytes = std::size_t(srcOps[0].getRawObjSizeBytes() + srcOps[1].getRawObjSizeBytes());
// First batch: [insert, insert]
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(2U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
ASSERT_EQUALS(srcOps[1], batch[1]);
// Second batch: [insert]
- batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[2], batch[0]);
}
@@ -251,19 +257,19 @@ TEST_F(OplogApplierTest,
srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")));
srcOps.push_back(makeCommitTransactionOplogEntry(2, dbName, false, 3));
srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
// Set batch limits so that commit transaction entry has to go into next batch as the only entry
// after taking into account the embedded op count.
_limits.ops = 3U;
// First batch: [insert]
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
// Second batch: [commit]
- batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[1], batch[0]);
}
@@ -275,20 +281,20 @@ TEST_F(OplogApplierTest,
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
srcOps.push_back(makeCommitTransactionOplogEntry(3, dbName, false, 3));
srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
// Set batch limits so that commit transaction entry has to go into next batch after taking into
// account embedded op count.
_limits.ops = 4U;
// First batch: [insert, insert]
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(2U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
ASSERT_EQUALS(srcOps[1], batch[1]);
// Second batch: [commit, insert]
- batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(2U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[2], batch[0]);
ASSERT_EQUALS(srcOps[3], batch[1]);
@@ -300,19 +306,19 @@ TEST_F(OplogApplierTest,
srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")));
srcOps.push_back(makeCommitTransactionOplogEntry(2, dbName, false, 5));
srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
// Set batch limits so that commit transaction entry goes into its own batch because its
// embedded count exceeds the batch limit for ops.
_limits.ops = 4U;
// First batch: [insert]
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
// Second batch: [commit]
- batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[1], batch[0]);
}
@@ -332,14 +338,14 @@ TEST_F(OplogApplierTest, LastOpInLargeTransactionIsProcessedIndividually) {
// is processed by itself.
srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")));
- _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
// Set large enough batch limit to ensure that batcher is not batching because of limit, but
// rather because it encountered the final oplog entry of a large transaction.
_limits.ops = 10U;
// First batch: [insert, applyOps, applyOps]
- auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(3U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[0], batch[0]);
ASSERT_EQUALS(srcOps[1], batch[1]);
@@ -347,17 +353,165 @@ TEST_F(OplogApplierTest, LastOpInLargeTransactionIsProcessedIndividually) {
// Second batch: [applyOps]. The last oplog entry of a large transaction must be processed by
// itself.
- batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[3], batch[0]);
// Third batch: [insert]. The this confirms that the last oplog entry of a large txn will be
// batched individually.
- batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits));
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
ASSERT_EQUALS(srcOps[4], batch[0]);
}
+class OplogApplierDelayTest : public OplogApplierTest, public ScopedGlobalServiceContextForTest {
+public:
+ void setUp() override {
+ OplogApplierTest::setUp();
+ auto* service = getServiceContext();
+ Client::initThread("OplogApplierDelayTest", service, nullptr);
+
+ _mockClock = std::make_shared<ClockSourceMock>();
+ // Avoid any issues due to a clock exactly at 0 (e.g. dates being default Date_t());
+ _mockClock->advance(Milliseconds(60000));
+ service->setFastClockSource(std::make_unique<SharedClockSourceAdapter>(_mockClock));
+ service->setPreciseClockSource(std::make_unique<SharedClockSourceAdapter>(_mockClock));
+
+ // The delay tests need a real operation context to use the service context clock.
+ _opCtxHolder = cc().makeOperationContext();
+
+ // Use a smaller limit for these tests.
+ _limits.ops = 3;
+ }
+ void tearDown() override {
+ _opCtxHolder = nullptr;
+ Client::releaseCurrent();
+ OplogApplierTest::tearDown();
+ }
+
+ OperationContext* opCtx() override {
+ return _opCtxHolder.get();
+ }
+
+ // Wait for the opCtx to be waited on, or for killWaits() to be run.
+ bool waitForWait() {
+ while (!_failWaits.load()) {
+ if (opCtx()->isWaitingForConditionOrInterrupt())
+ return true;
+ sleepmillis(1);
+ }
+ return false;
+ }
+
+ // Ends any waitForWait calls. Used to turn some potential hangs into outright failures.
+ void killWaits() {
+ _failWaits.store(true);
+ }
+
+protected:
+ std::shared_ptr<ClockSourceMock> _mockClock;
+ ServiceContext::UniqueOperationContext _opCtxHolder;
+ AtomicWord<bool> _failWaits{false};
+};
+
+TEST_F(OplogApplierDelayTest, GetNextApplierBatchReturnsEmptyBatchImmediately) {
+ auto batch =
+ unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10)));
+ ASSERT_EQ(0, batch.size());
+}
+
+TEST_F(OplogApplierDelayTest, GetNextApplierBatchReturnsFullBatchImmediately) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo")));
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "baz")));
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
+
+ auto batch =
+ unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10)));
+ ASSERT_EQ(3, batch.size());
+}
+
+TEST_F(OplogApplierDelayTest, GetNextApplierBatchWaitsForBatchToFill) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo")));
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
+
+ stdx::thread insertThread([this, &srcOps] {
+ ASSERT(waitForWait());
+ {
+ FailPointEnableBlock peekFailPoint("oplogBatcherPauseAfterSuccessfulPeek");
+ srcOps.clear();
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
+ peekFailPoint->waitForTimesEntered(peekFailPoint.initialTimesEntered() + 1);
+ _mockClock->advance(Milliseconds(5));
+ }
+ ASSERT(waitForWait());
+ srcOps.clear();
+ srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "baz")));
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
+ });
+ auto batch =
+ unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10)));
+ ASSERT_EQ(3, batch.size());
+ killWaits();
+ insertThread.join();
+}
+
+TEST_F(OplogApplierDelayTest, GetNextApplierBatchWaitsForBatchToTimeout) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo")));
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
+
+ stdx::thread insertThread([this, &srcOps] {
+ ASSERT(waitForWait());
+ {
+ FailPointEnableBlock peekFailPoint("oplogBatcherPauseAfterSuccessfulPeek");
+ srcOps.clear();
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
+ peekFailPoint->waitForTimesEntered(peekFailPoint.initialTimesEntered() + 1);
+ _mockClock->advance(Milliseconds(5));
+ }
+ ASSERT(waitForWait());
+ _mockClock->advance(Milliseconds(5));
+ });
+ auto batch =
+ unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10)));
+ ASSERT_EQ(2, batch.size());
+ killWaits();
+ insertThread.join();
+}
+
+// Makes sure that interrupting the batch while waiting does interrupt the timeout,
+// but does not throw or lose any data.
+TEST_F(OplogApplierDelayTest, GetNextApplierBatchInterrupted) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo")));
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
+
+ stdx::thread insertThread([this, &srcOps] {
+ ASSERT(waitForWait());
+ {
+ FailPointEnableBlock peekFailPoint("oplogBatcherPauseAfterSuccessfulPeek");
+ srcOps.clear();
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend());
+ peekFailPoint->waitForTimesEntered(peekFailPoint.initialTimesEntered() + 1);
+ _mockClock->advance(Milliseconds(5));
+ }
+ ASSERT(waitForWait());
+ opCtx()->markKilled(ErrorCodes::Interrupted);
+ });
+ auto batch =
+ unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10)));
+ ASSERT_EQ(2, batch.size());
+ ASSERT_EQ(ErrorCodes::Interrupted, opCtx()->checkForInterruptNoAssert());
+ killWaits();
+ insertThread.join();
+}
+
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp
index 1fe68d71f3b..13084f48b23 100644
--- a/src/mongo/db/repl/oplog_batcher.cpp
+++ b/src/mongo/db/repl/oplog_batcher.cpp
@@ -40,6 +40,7 @@
namespace mongo {
namespace repl {
MONGO_FAIL_POINT_DEFINE(skipOplogBatcherWaitForData);
+MONGO_FAIL_POINT_DEFINE(oplogBatcherPauseAfterSuccessfulPeek);
OplogBatcher::OplogBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer)
: _oplogApplier(oplogApplier), _oplogBuffer(oplogBuffer), _ops(0) {}
@@ -150,7 +151,7 @@ std::size_t OplogBatcher::getOpCount(const OplogEntry& entry) {
}
StatusWith<std::vector<OplogEntry>> OplogBatcher::getNextApplierBatch(
- OperationContext* opCtx, const BatchLimits& batchLimits) {
+ OperationContext* opCtx, const BatchLimits& batchLimits, Milliseconds waitToFillBatch) {
if (batchLimits.ops == 0) {
return Status(ErrorCodes::InvalidOptions, "Batch size must be greater than 0.");
}
@@ -159,7 +160,13 @@ StatusWith<std::vector<OplogEntry>> OplogBatcher::getNextApplierBatch(
std::uint32_t totalBytes = 0;
std::vector<OplogEntry> ops;
BSONObj op;
+ Date_t batchDeadline;
+ if (waitToFillBatch > Milliseconds(0)) {
+ batchDeadline =
+ opCtx->getServiceContext()->getPreciseClockSource()->now() + waitToFillBatch;
+ }
while (_oplogBuffer->peek(opCtx, &op)) {
+ oplogBatcherPauseAfterSuccessfulPeek.pauseWhileSet();
auto entry = OplogEntry(op);
// Check for oplog version change.
@@ -221,6 +228,25 @@ StatusWith<std::vector<OplogEntry>> OplogBatcher::getNextApplierBatch(
totalBytes += opBytes;
ops.push_back(std::move(entry));
_consume(opCtx, _oplogBuffer);
+ // At this point we either have a partial batch or an exactly full batch; if we are using
+ // a wait to fill the batch, we should wait if and only if the batch is partial.
+ if (batchDeadline != Date_t() && totalOps < batchLimits.ops &&
+ totalBytes < batchLimits.bytes) {
+ LOGV2_DEBUG(6572301,
+ 3,
+ "Waiting for batch to fill",
+ "deadline"_attr = batchDeadline,
+ "waitToFillBatch"_attr = waitToFillBatch,
+ "totalOps"_attr = totalOps,
+ "totalBytes"_attr = totalBytes);
+ try {
+ _oplogBuffer->waitForDataUntil(batchDeadline, opCtx);
+ } catch (const ExceptionForCat<ErrorCategory::Interruption>& e) {
+ LOGV2(6572300,
+ "Interrupted in oplog batching; returning current partial batch.",
+ "error"_attr = e);
+ }
+ }
}
return std::move(ops);
}
@@ -281,8 +307,9 @@ void OplogBatcher::_run(StorageInterface* storageInterface) {
// Locks the oplog to check its max size, do this in the UninterruptibleLockGuard.
batchLimits.bytes = getBatchLimitOplogBytes(opCtx.get(), storageInterface);
- auto oplogEntries =
- fassertNoTrace(31004, getNextApplierBatch(opCtx.get(), batchLimits));
+ auto oplogEntries = fassertNoTrace(
+ 31004,
+ getNextApplierBatch(opCtx.get(), batchLimits, Milliseconds(oplogBatchDelayMillis)));
for (const auto& oplogEntry : oplogEntries) {
ops.emplace_back(oplogEntry);
}
diff --git a/src/mongo/db/repl/oplog_batcher.h b/src/mongo/db/repl/oplog_batcher.h
index 21ce5010d80..70cb9b87ad3 100644
--- a/src/mongo/db/repl/oplog_batcher.h
+++ b/src/mongo/db/repl/oplog_batcher.h
@@ -169,9 +169,16 @@ public:
* at most "BatchLimits::bytes" worth of OplogEntries
* only OplogEntries from before the "BatchLimits::secondaryDelaySecsLatestTimestamp" point
* a single command OplogEntry (excluding applyOps, which are grouped with CRUD ops)
+ *
+ * If waitToFillBatch is non-zero and any data is available, waits for more data up to that many
+ * milliseconds from the start of the batch when the batch is not full. The wait is
+ * interruptible but aside from ending the wait, interrupts will be ignored to avoid losing
+ * data. (that is, on interrupt, data already in the batch is returned immediately)
*/
- StatusWith<std::vector<OplogEntry>> getNextApplierBatch(OperationContext* opCtx,
- const BatchLimits& batchLimits);
+ StatusWith<std::vector<OplogEntry>> getNextApplierBatch(
+ OperationContext* opCtx,
+ const BatchLimits& batchLimits,
+ Milliseconds waitToFillBatch = Milliseconds(0));
/**
* Helper method indicating that this oplog entry must be in a batch of its own.
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 6743bd5b058..4039aa56e41 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -107,6 +107,17 @@ server_parameters:
cpp_varname: oplogFetcherUsesExhaust
default: true
+ oplogBatchDelayMillis:
+ description: >-
+ How long, in milliseconds, to wait for more data when an oplog application batch is
+ not full.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: oplogBatchDelayMillis
+ default: 0
+ validator:
+ gte: 0
+
# From bgsync.cpp
bgSyncOplogFetcherBatchSize:
description: The batchSize to use for the find/getMore queries called by the OplogFetcher