diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2022-04-22 16:29:40 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-22 15:24:30 +0000 |
commit | 81a4764d6637fe6a645cf54bdba6984b7d466ea9 (patch) | |
tree | 2552495eb566609b0cf1ac01e1ee97698aa58340 | |
parent | f60e7b0978053aa7908d7e7544e1bb14bd7f9b22 (diff) | |
download | mongo-81a4764d6637fe6a645cf54bdba6984b7d466ea9.tar.gz |
SERVER-65723 Add parameter oplogBatchDelayMillis to improve batching on caught-up secondaries
(cherry picked from commit 97dc4971f5894c070a029dd8d05b857062b5ddde)
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_test.cpp | 234 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_batcher.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_batcher.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 11 |
6 files changed, 250 insertions, 49 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 51d3134d565..e0fe3fe3c35 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -128,8 +128,8 @@ StatusWith<OpTime> OplogApplier::applyOplogBatch(OperationContext* opCtx, } StatusWith<std::vector<OplogEntry>> OplogApplier::getNextApplierBatch( - OperationContext* opCtx, const BatchLimits& batchLimits) { - return _oplogBatcher->getNextApplierBatch(opCtx, batchLimits); + OperationContext* opCtx, const BatchLimits& batchLimits, Milliseconds waitToFillBatch) { + return _oplogBatcher->getNextApplierBatch(opCtx, batchLimits, waitToFillBatch); } const OplogApplier::Options& OplogApplier::getOptions() const { diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index e35c6585863..b582b282efd 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -159,8 +159,10 @@ public: /** * Calls the OplogBatcher's getNextApplierBatch. */ - StatusWith<std::vector<OplogEntry>> getNextApplierBatch(OperationContext* opCtx, - const BatchLimits& batchLimits); + StatusWith<std::vector<OplogEntry>> getNextApplierBatch( + OperationContext* opCtx, + const BatchLimits& batchLimits, + Milliseconds waitToFillBatch = Milliseconds(0)); const Options& getOptions() const; diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp index 5fc5420cf17..cb35f041caa 100644 --- a/src/mongo/db/repl/oplog_applier_test.cpp +++ b/src/mongo/db/repl/oplog_applier_test.cpp @@ -38,7 +38,9 @@ #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/oplog_batcher_test_fixture.h" #include "mongo/db/repl/oplog_buffer_blocking_queue.h" +#include "mongo/db/service_context_test_fixture.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" namespace mongo { namespace repl { @@ -74,13 +76,17 @@ StatusWith<OpTime> OplogApplierMock::_applyOplogBatch(OperationContext* opCtx, class OplogApplierTest : public unittest::Test { public: - void setUp() final; - void tearDown() final; + void setUp() override; + void tearDown() override; + virtual OperationContext* opCtx() { + return _opCtxNoop.get(); + } + protected: std::unique_ptr<OplogBuffer> _buffer; std::unique_ptr<OplogApplier> _applier; - std::unique_ptr<OperationContext> _opCtx; + std::unique_ptr<OperationContext> _opCtxNoop; OplogApplier::BatchLimits _limits; }; @@ -90,7 +96,7 @@ void OplogApplierTest::setUp() { // The OplogApplier interface expects an OperationContext* but the mock implementations in this // test will not be dereferencing the pointer. Therefore, it is sufficient to use an // OperationContextNoop. - _opCtx = std::make_unique<OperationContextNoop>(); + _opCtxNoop = std::make_unique<OperationContextNoop>(); _limits.bytes = std::numeric_limits<decltype(_limits.bytes)>::max(); _limits.ops = std::numeric_limits<decltype(_limits.ops)>::max(); @@ -98,7 +104,7 @@ void OplogApplierTest::setUp() { void OplogApplierTest::tearDown() { _limits = {}; - _opCtx = {}; + _opCtxNoop = {}; _applier = {}; _buffer = {}; } @@ -109,9 +115,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchGroupsCrudOps) { std::vector<OplogEntry> srcOps; srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"))); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(srcOps.size(), batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); ASSERT_EQUALS(srcOps[1], batch[1]); @@ -121,9 +127,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsPreparedApplyOpsOpInOwnBatch) std::vector<OplogEntry> srcOps; srcOps.push_back(makeApplyOpsOplogEntry(1, true)); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); } @@ -132,9 +138,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchGroupsUnpreparedApplyOpsOpWithOtherO std::vector<OplogEntry> srcOps; srcOps.push_back(makeApplyOpsOplogEntry(1, false)); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(2U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); ASSERT_EQUALS(srcOps[1], batch[1]); @@ -145,9 +151,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsSystemDotViewsOpInOwnBatch) { srcOps.push_back(makeInsertOplogEntry( 1, NamespaceString(dbName, NamespaceString::kSystemDotViewsCollectionName))); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); } @@ -156,9 +162,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsServerConfigurationOpInOwnBat std::vector<OplogEntry> srcOps; srcOps.push_back(makeInsertOplogEntry(1, NamespaceString::kServerConfigurationNamespace)); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); } @@ -167,9 +173,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsConfigReshardingDonorOpInOwnB std::vector<OplogEntry> srcOps; srcOps.push_back(makeInsertOplogEntry(1, NamespaceString::kDonorReshardingOperationsNamespace)); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); } @@ -178,9 +184,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsPreparedCommitTransactionOpIn std::vector<OplogEntry> srcOps; srcOps.push_back(makeCommitTransactionOplogEntry(1, dbName, true, 3)); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); } @@ -189,9 +195,9 @@ TEST_F(OplogApplierTest, GetNextApplierBatchGroupsUnpreparedCommitTransactionOpW std::vector<OplogEntry> srcOps; srcOps.push_back(makeCommitTransactionOplogEntry(1, dbName, false, 3)); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(2U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); ASSERT_EQUALS(srcOps[1], batch[1]); @@ -204,20 +210,20 @@ TEST_F(OplogApplierTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOperatio srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar"))); srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar"))); srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); // Set batch limits so that each batch contains a maximum of 'BatchLimit::ops'. _limits.ops = 3U; // First batch: [insert, insert, insert] - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(3U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); ASSERT_EQUALS(srcOps[1], batch[1]); ASSERT_EQUALS(srcOps[2], batch[2]); // Second batch: [insert, insert] - batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(2U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[3], batch[0]); ASSERT_EQUALS(srcOps[4], batch[1]); @@ -228,19 +234,19 @@ TEST_F(OplogApplierTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOperations srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar"))); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); // Set batch limits so that only the first two operations can fit into the first batch. _limits.bytes = std::size_t(srcOps[0].getRawObjSizeBytes() + srcOps[1].getRawObjSizeBytes()); // First batch: [insert, insert] - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(2U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); ASSERT_EQUALS(srcOps[1], batch[1]); // Second batch: [insert] - batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[2], batch[0]); } @@ -251,19 +257,19 @@ TEST_F(OplogApplierTest, srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar"))); srcOps.push_back(makeCommitTransactionOplogEntry(2, dbName, false, 3)); srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); // Set batch limits so that commit transaction entry has to go into next batch as the only entry // after taking into account the embedded op count. _limits.ops = 3U; // First batch: [insert] - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); // Second batch: [commit] - batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[1], batch[0]); } @@ -275,20 +281,20 @@ TEST_F(OplogApplierTest, srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); srcOps.push_back(makeCommitTransactionOplogEntry(3, dbName, false, 3)); srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); // Set batch limits so that commit transaction entry has to go into next batch after taking into // account embedded op count. _limits.ops = 4U; // First batch: [insert, insert] - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(2U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); ASSERT_EQUALS(srcOps[1], batch[1]); // Second batch: [commit, insert] - batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(2U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[2], batch[0]); ASSERT_EQUALS(srcOps[3], batch[1]); @@ -300,19 +306,19 @@ TEST_F(OplogApplierTest, srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar"))); srcOps.push_back(makeCommitTransactionOplogEntry(2, dbName, false, 5)); srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); // Set batch limits so that commit transaction entry goes into its own batch because its // embedded count exceeds the batch limit for ops. _limits.ops = 4U; // First batch: [insert] - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); // Second batch: [commit] - batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[1], batch[0]); } @@ -332,14 +338,14 @@ TEST_F(OplogApplierTest, LastOpInLargeTransactionIsProcessedIndividually) { // is processed by itself. srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar"))); - _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); // Set large enough batch limit to ensure that batcher is not batching because of limit, but // rather because it encountered the final oplog entry of a large transaction. _limits.ops = 10U; // First batch: [insert, applyOps, applyOps] - auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + auto batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(3U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[0], batch[0]); ASSERT_EQUALS(srcOps[1], batch[1]); @@ -347,17 +353,165 @@ TEST_F(OplogApplierTest, LastOpInLargeTransactionIsProcessedIndividually) { // Second batch: [applyOps]. The last oplog entry of a large transaction must be processed by // itself. - batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[3], batch[0]); // Third batch: [insert]. The this confirms that the last oplog entry of a large txn will be // batched individually. - batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + batch = unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits)); ASSERT_EQUALS(1U, batch.size()) << toString(batch); ASSERT_EQUALS(srcOps[4], batch[0]); } +class OplogApplierDelayTest : public OplogApplierTest, public ScopedGlobalServiceContextForTest { +public: + void setUp() override { + OplogApplierTest::setUp(); + auto* service = getServiceContext(); + Client::initThread("OplogApplierDelayTest", service, nullptr); + + _mockClock = std::make_shared<ClockSourceMock>(); + // Avoid any issues due to a clock exactly at 0 (e.g. dates being default Date_t()); + _mockClock->advance(Milliseconds(60000)); + service->setFastClockSource(std::make_unique<SharedClockSourceAdapter>(_mockClock)); + service->setPreciseClockSource(std::make_unique<SharedClockSourceAdapter>(_mockClock)); + + // The delay tests need a real operation context to use the service context clock. + _opCtxHolder = cc().makeOperationContext(); + + // Use a smaller limit for these tests. + _limits.ops = 3; + } + void tearDown() override { + _opCtxHolder = nullptr; + Client::releaseCurrent(); + OplogApplierTest::tearDown(); + } + + OperationContext* opCtx() override { + return _opCtxHolder.get(); + } + + // Wait for the opCtx to be waited on, or for killWaits() to be run. + bool waitForWait() { + while (!_failWaits.load()) { + if (opCtx()->isWaitingForConditionOrInterrupt()) + return true; + sleepmillis(1); + } + return false; + } + + // Ends any waitForWait calls. Used to turn some potential hangs into outright failures. + void killWaits() { + _failWaits.store(true); + } + +protected: + std::shared_ptr<ClockSourceMock> _mockClock; + ServiceContext::UniqueOperationContext _opCtxHolder; + AtomicWord<bool> _failWaits{false}; +}; + +TEST_F(OplogApplierDelayTest, GetNextApplierBatchReturnsEmptyBatchImmediately) { + auto batch = + unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10))); + ASSERT_EQ(0, batch.size()); +} + +TEST_F(OplogApplierDelayTest, GetNextApplierBatchReturnsFullBatchImmediately) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"))); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "baz"))); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); + + auto batch = + unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10))); + ASSERT_EQ(3, batch.size()); +} + +TEST_F(OplogApplierDelayTest, GetNextApplierBatchWaitsForBatchToFill) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"))); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); + + stdx::thread insertThread([this, &srcOps] { + ASSERT(waitForWait()); + { + FailPointEnableBlock peekFailPoint("oplogBatcherPauseAfterSuccessfulPeek"); + srcOps.clear(); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); + peekFailPoint->waitForTimesEntered(peekFailPoint.initialTimesEntered() + 1); + _mockClock->advance(Milliseconds(5)); + } + ASSERT(waitForWait()); + srcOps.clear(); + srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "baz"))); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); + }); + auto batch = + unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10))); + ASSERT_EQ(3, batch.size()); + killWaits(); + insertThread.join(); +} + +TEST_F(OplogApplierDelayTest, GetNextApplierBatchWaitsForBatchToTimeout) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"))); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); + + stdx::thread insertThread([this, &srcOps] { + ASSERT(waitForWait()); + { + FailPointEnableBlock peekFailPoint("oplogBatcherPauseAfterSuccessfulPeek"); + srcOps.clear(); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); + peekFailPoint->waitForTimesEntered(peekFailPoint.initialTimesEntered() + 1); + _mockClock->advance(Milliseconds(5)); + } + ASSERT(waitForWait()); + _mockClock->advance(Milliseconds(5)); + }); + auto batch = + unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10))); + ASSERT_EQ(2, batch.size()); + killWaits(); + insertThread.join(); +} + +// Makes sure that interrupting the batch while waiting does interrupt the timeout, +// but does not throw or lose any data. +TEST_F(OplogApplierDelayTest, GetNextApplierBatchInterrupted) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"))); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); + + stdx::thread insertThread([this, &srcOps] { + ASSERT(waitForWait()); + { + FailPointEnableBlock peekFailPoint("oplogBatcherPauseAfterSuccessfulPeek"); + srcOps.clear(); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + _applier->enqueue(opCtx(), srcOps.cbegin(), srcOps.cend()); + peekFailPoint->waitForTimesEntered(peekFailPoint.initialTimesEntered() + 1); + _mockClock->advance(Milliseconds(5)); + } + ASSERT(waitForWait()); + opCtx()->markKilled(ErrorCodes::Interrupted); + }); + auto batch = + unittest::assertGet(_applier->getNextApplierBatch(opCtx(), _limits, Milliseconds(10))); + ASSERT_EQ(2, batch.size()); + ASSERT_EQ(ErrorCodes::Interrupted, opCtx()->checkForInterruptNoAssert()); + killWaits(); + insertThread.join(); +} + } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp index 1fe68d71f3b..13084f48b23 100644 --- a/src/mongo/db/repl/oplog_batcher.cpp +++ b/src/mongo/db/repl/oplog_batcher.cpp @@ -40,6 +40,7 @@ namespace mongo { namespace repl { MONGO_FAIL_POINT_DEFINE(skipOplogBatcherWaitForData); +MONGO_FAIL_POINT_DEFINE(oplogBatcherPauseAfterSuccessfulPeek); OplogBatcher::OplogBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer) : _oplogApplier(oplogApplier), _oplogBuffer(oplogBuffer), _ops(0) {} @@ -150,7 +151,7 @@ std::size_t OplogBatcher::getOpCount(const OplogEntry& entry) { } StatusWith<std::vector<OplogEntry>> OplogBatcher::getNextApplierBatch( - OperationContext* opCtx, const BatchLimits& batchLimits) { + OperationContext* opCtx, const BatchLimits& batchLimits, Milliseconds waitToFillBatch) { if (batchLimits.ops == 0) { return Status(ErrorCodes::InvalidOptions, "Batch size must be greater than 0."); } @@ -159,7 +160,13 @@ StatusWith<std::vector<OplogEntry>> OplogBatcher::getNextApplierBatch( std::uint32_t totalBytes = 0; std::vector<OplogEntry> ops; BSONObj op; + Date_t batchDeadline; + if (waitToFillBatch > Milliseconds(0)) { + batchDeadline = + opCtx->getServiceContext()->getPreciseClockSource()->now() + waitToFillBatch; + } while (_oplogBuffer->peek(opCtx, &op)) { + oplogBatcherPauseAfterSuccessfulPeek.pauseWhileSet(); auto entry = OplogEntry(op); // Check for oplog version change. @@ -221,6 +228,25 @@ StatusWith<std::vector<OplogEntry>> OplogBatcher::getNextApplierBatch( totalBytes += opBytes; ops.push_back(std::move(entry)); _consume(opCtx, _oplogBuffer); + // At this point we either have a partial batch or an exactly full batch; if we are using + // a wait to fill the batch, we should wait if and only if the batch is partial. + if (batchDeadline != Date_t() && totalOps < batchLimits.ops && + totalBytes < batchLimits.bytes) { + LOGV2_DEBUG(6572301, + 3, + "Waiting for batch to fill", + "deadline"_attr = batchDeadline, + "waitToFillBatch"_attr = waitToFillBatch, + "totalOps"_attr = totalOps, + "totalBytes"_attr = totalBytes); + try { + _oplogBuffer->waitForDataUntil(batchDeadline, opCtx); + } catch (const ExceptionForCat<ErrorCategory::Interruption>& e) { + LOGV2(6572300, + "Interrupted in oplog batching; returning current partial batch.", + "error"_attr = e); + } + } } return std::move(ops); } @@ -281,8 +307,9 @@ void OplogBatcher::_run(StorageInterface* storageInterface) { // Locks the oplog to check its max size, do this in the UninterruptibleLockGuard. batchLimits.bytes = getBatchLimitOplogBytes(opCtx.get(), storageInterface); - auto oplogEntries = - fassertNoTrace(31004, getNextApplierBatch(opCtx.get(), batchLimits)); + auto oplogEntries = fassertNoTrace( + 31004, + getNextApplierBatch(opCtx.get(), batchLimits, Milliseconds(oplogBatchDelayMillis))); for (const auto& oplogEntry : oplogEntries) { ops.emplace_back(oplogEntry); } diff --git a/src/mongo/db/repl/oplog_batcher.h b/src/mongo/db/repl/oplog_batcher.h index 21ce5010d80..70cb9b87ad3 100644 --- a/src/mongo/db/repl/oplog_batcher.h +++ b/src/mongo/db/repl/oplog_batcher.h @@ -169,9 +169,16 @@ public: * at most "BatchLimits::bytes" worth of OplogEntries * only OplogEntries from before the "BatchLimits::secondaryDelaySecsLatestTimestamp" point * a single command OplogEntry (excluding applyOps, which are grouped with CRUD ops) + * + * If waitToFillBatch is non-zero and any data is available, waits for more data up to that many + * milliseconds from the start of the batch when the batch is not full. The wait is + * interruptible but aside from ending the wait, interrupts will be ignored to avoid losing + * data. (that is, on interrupt, data already in the batch is returned immediately) */ - StatusWith<std::vector<OplogEntry>> getNextApplierBatch(OperationContext* opCtx, - const BatchLimits& batchLimits); + StatusWith<std::vector<OplogEntry>> getNextApplierBatch( + OperationContext* opCtx, + const BatchLimits& batchLimits, + Milliseconds waitToFillBatch = Milliseconds(0)); /** * Helper method indicating that this oplog entry must be in a batch of its own. diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 6743bd5b058..4039aa56e41 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -107,6 +107,17 @@ server_parameters: cpp_varname: oplogFetcherUsesExhaust default: true + oplogBatchDelayMillis: + description: >- + How long, in milliseconds, to wait for more data when an oplog application batch is + not full. + set_at: startup + cpp_vartype: int + cpp_varname: oplogBatchDelayMillis + default: 0 + validator: + gte: 0 + # From bgsync.cpp bgSyncOplogFetcherBatchSize: description: The batchSize to use for the find/getMore queries called by the OplogFetcher |