diff options
author | Benety Goh <benety@mongodb.com> | 2019-04-08 16:00:33 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2019-04-08 16:00:33 -0400 |
commit | f68f8b7ccb70de47eacbd0d3f196d1de9b6f6b0d (patch) | |
tree | b61ff7cb0ca8f72fbba6b8749c312c084dab0079 | |
parent | 8eae618a76e8a454e491f1a7ebaf6e66a229798e (diff) | |
download | mongo-f68f8b7ccb70de47eacbd0d3f196d1de9b6f6b0d.tar.gz |
SERVER-39436 fix batch limit checking in OplogApplier::getNextApplierBatch()
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_test.cpp | 48 |
2 files changed, 67 insertions, 9 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index da6370425df..714640e528f 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -199,6 +199,15 @@ bool mustProcessStandalone(const OplogEntry& entry) { return false; } +/** + * Returns the number of logical operations represented by an oplog entry. + * This is usually one but may be greater than one in certain cases, such as in a commitTransaction + * command. + */ +std::size_t getOpCount(const OplogEntry& entry) { + return 1U; +} + } // namespace StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch( @@ -207,6 +216,7 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch( return Status(ErrorCodes::InvalidOptions, "Batch size must be greater than 0."); } + std::size_t totalOps = 0; std::uint32_t totalBytes = 0; Operations ops; BSONObj op; @@ -245,18 +255,18 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch( return std::move(ops); } - // Apply replication batch limits. - if (ops.size() >= batchLimits.ops) { - return std::move(ops); - } - - // Never return an empty batch if there are operations left. - if ((totalBytes + entry.getRawObjSizeBytes() >= batchLimits.bytes) && (ops.size() > 0)) { - return std::move(ops); + // Apply replication batch limits. Avoid returning an empty batch. + auto opCount = getOpCount(entry); + auto opBytes = entry.getRawObjSizeBytes(); + if (totalOps > 0) { + if (totalOps + opCount > batchLimits.ops || totalBytes + opBytes > batchLimits.bytes) { + return std::move(ops); + } } // Add op to buffer. - totalBytes += entry.getRawObjSizeBytes(); + totalOps += opCount; + totalBytes += opBytes; ops.push_back(std::move(entry)); _consume(opCtx, _oplogBuffer); } diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp index 898a73a46de..e40da89096e 100644 --- a/src/mongo/db/repl/oplog_applier_test.cpp +++ b/src/mongo/db/repl/oplog_applier_test.cpp @@ -274,6 +274,54 @@ TEST_F(OplogApplierTest, GetNextApplierBatchGroupsUnpreparedCommitTransactionOpW ASSERT_EQUALS(srcOps[1], batch[1]); } +TEST_F(OplogApplierTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOperations) { + OplogApplier::Operations srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar"))); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar"))); + srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar"))); + srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar"))); + _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + + // Set batch limits so that each batch contains a maximum of 'BatchLimit::ops'. + _limits.ops = 3U; + + // First batch: [insert, insert, insert] + auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + ASSERT_EQUALS(3U, batch.size()) << toString(batch); + ASSERT_EQUALS(srcOps[0], batch[0]); + ASSERT_EQUALS(srcOps[1], batch[1]); + ASSERT_EQUALS(srcOps[2], batch[2]); + + // Second batch: [insert, insert] + batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + ASSERT_EQUALS(2U, batch.size()) << toString(batch); + ASSERT_EQUALS(srcOps[3], batch[0]); + ASSERT_EQUALS(srcOps[4], batch[1]); +} + +TEST_F(OplogApplierTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOperations) { + OplogApplier::Operations srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar"))); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar"))); + _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + + // Set batch limits so that only the first two operations can fit into the first batch. + _limits.bytes = std::size_t(srcOps[0].getRawObjSizeBytes() + srcOps[1].getRawObjSizeBytes()); + + // First batch: [insert, insert] + auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + ASSERT_EQUALS(2U, batch.size()) << toString(batch); + ASSERT_EQUALS(srcOps[0], batch[0]); + ASSERT_EQUALS(srcOps[1], batch[1]); + + // Second batch: [insert] + batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + ASSERT_EQUALS(1U, batch.size()) << toString(batch); + ASSERT_EQUALS(srcOps[2], batch[0]); +} + } // namespace } // namespace repl } // namespace mongo |