summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-08 16:00:33 -0400
committerBenety Goh <benety@mongodb.com>2019-04-08 16:00:33 -0400
commitf68f8b7ccb70de47eacbd0d3f196d1de9b6f6b0d (patch)
treeb61ff7cb0ca8f72fbba6b8749c312c084dab0079
parent8eae618a76e8a454e491f1a7ebaf6e66a229798e (diff)
downloadmongo-f68f8b7ccb70de47eacbd0d3f196d1de9b6f6b0d.tar.gz
SERVER-39436 fix batch limit checking in OplogApplier::getNextApplierBatch()
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp28
-rw-r--r--src/mongo/db/repl/oplog_applier_test.cpp48
2 files changed, 67 insertions, 9 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index da6370425df..714640e528f 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -199,6 +199,15 @@ bool mustProcessStandalone(const OplogEntry& entry) {
return false;
}
+/**
+ * Returns the number of logical operations represented by an oplog entry.
+ * This is usually one but may be greater than one in certain cases, such as in a commitTransaction
+ * command.
+ */
+std::size_t getOpCount(const OplogEntry& entry) {
+ return 1U;
+}
+
} // namespace
StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
@@ -207,6 +216,7 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
return Status(ErrorCodes::InvalidOptions, "Batch size must be greater than 0.");
}
+ std::size_t totalOps = 0;
std::uint32_t totalBytes = 0;
Operations ops;
BSONObj op;
@@ -245,18 +255,18 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
return std::move(ops);
}
- // Apply replication batch limits.
- if (ops.size() >= batchLimits.ops) {
- return std::move(ops);
- }
-
- // Never return an empty batch if there are operations left.
- if ((totalBytes + entry.getRawObjSizeBytes() >= batchLimits.bytes) && (ops.size() > 0)) {
- return std::move(ops);
+ // Apply replication batch limits. Avoid returning an empty batch.
+ auto opCount = getOpCount(entry);
+ auto opBytes = entry.getRawObjSizeBytes();
+ if (totalOps > 0) {
+ if (totalOps + opCount > batchLimits.ops || totalBytes + opBytes > batchLimits.bytes) {
+ return std::move(ops);
+ }
}
// Add op to buffer.
- totalBytes += entry.getRawObjSizeBytes();
+ totalOps += opCount;
+ totalBytes += opBytes;
ops.push_back(std::move(entry));
_consume(opCtx, _oplogBuffer);
}
diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp
index 898a73a46de..e40da89096e 100644
--- a/src/mongo/db/repl/oplog_applier_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_test.cpp
@@ -274,6 +274,54 @@ TEST_F(OplogApplierTest, GetNextApplierBatchGroupsUnpreparedCommitTransactionOpW
ASSERT_EQUALS(srcOps[1], batch[1]);
}
+TEST_F(OplogApplierTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOperations) {
+ OplogApplier::Operations srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")));
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")));
+ srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar")));
+ srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")));
+ _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+
+ // Set batch limits so that each batch contains a maximum of 'BatchLimit::ops'.
+ _limits.ops = 3U;
+
+ // First batch: [insert, insert, insert]
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ ASSERT_EQUALS(3U, batch.size()) << toString(batch);
+ ASSERT_EQUALS(srcOps[0], batch[0]);
+ ASSERT_EQUALS(srcOps[1], batch[1]);
+ ASSERT_EQUALS(srcOps[2], batch[2]);
+
+ // Second batch: [insert, insert]
+ batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ ASSERT_EQUALS(2U, batch.size()) << toString(batch);
+ ASSERT_EQUALS(srcOps[3], batch[0]);
+ ASSERT_EQUALS(srcOps[4], batch[1]);
+}
+
+TEST_F(OplogApplierTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOperations) {
+ OplogApplier::Operations srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")));
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")));
+ _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+
+ // Set batch limits so that only the first two operations can fit into the first batch.
+ _limits.bytes = std::size_t(srcOps[0].getRawObjSizeBytes() + srcOps[1].getRawObjSizeBytes());
+
+ // First batch: [insert, insert]
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ ASSERT_EQUALS(2U, batch.size()) << toString(batch);
+ ASSERT_EQUALS(srcOps[0], batch[0]);
+ ASSERT_EQUALS(srcOps[1], batch[1]);
+
+ // Second batch: [insert]
+ batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ ASSERT_EQUALS(1U, batch.size()) << toString(batch);
+ ASSERT_EQUALS(srcOps[2], batch[0]);
+}
+
} // namespace
} // namespace repl
} // namespace mongo