summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2017-07-13 18:22:51 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2017-07-31 11:37:07 -0400
commitaadff4eb0b8b051777a6e4315e91dde83369e2cd (patch)
treef421af2845f6d91a55f9b49405fe7b79a1d38eb5
parent49aed372a06b74db45c60071d6cc8b38c8efd63a (diff)
downloadmongo-aadff4eb0b8b051777a6e4315e91dde83369e2cd.tar.gz
SERVER-29282 Prevent BSON Document size from being exceeded when grouping inserts on SECONDARY
(cherry picked from commit 1da62c11258aaa91dcff3f0133775aae615e29d4)
-rw-r--r--src/mongo/db/repl/sync_tail.cpp54
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp134
2 files changed, 176 insertions, 12 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 8c0c0229508..30f1e28b619 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -1070,6 +1070,8 @@ Status multiSyncApply_noAbort(OperationContext* txn,
// allow us to get through the magic barrier
txn->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+ // Sort the oplog entries by namespace, so that entries from the same namespace will be next to
+ // each other in the list.
if (oplogEntryPointers->size() > 1) {
std::stable_sort(oplogEntryPointers->begin(),
oplogEntryPointers->end(),
@@ -1086,25 +1088,54 @@ Status multiSyncApply_noAbort(OperationContext* txn,
for (auto oplogEntriesIterator = oplogEntryPointers->begin();
oplogEntriesIterator != oplogEntryPointers->end();
++oplogEntriesIterator) {
+
auto entry = *oplogEntriesIterator;
if (entry->opType[0] == 'i' && !entry->isForCappedCollection &&
oplogEntriesIterator > doNotGroupBeforePoint) {
- // Attempt to group inserts if possible.
+
std::vector<BSONObj> toInsert;
- int batchSize = 0;
- int batchCount = 0;
+
+ auto maxBatchSize = insertVectorMaxBytes;
+ auto maxBatchCount = 64;
+
+ // Make sure to include the first op in the batch size.
+ int batchSize = (*oplogEntriesIterator)->o.Obj().objsize();
+ int batchCount = 1;
+ auto batchNamespace = entry->ns;
+
+ /**
+ * Search for the op that delimits this insert batch, and save its position
+ * in endOfGroupableOpsIterator. For example, given the following list of oplog
+ * entries with a sequence of groupable inserts:
+ *
+ * S--------------E
+ * u, u, u, i, i, i, i, i, d, d
+ *
+ * S: start of insert group
+ * E: end of groupable ops
+ *
+ * E is the position of endOfGroupableOpsIterator. i.e. endOfGroupableOpsIterator
+ * will point to the first op that *can't* be added to the current insert group.
+ */
auto endOfGroupableOpsIterator = std::find_if(
oplogEntriesIterator + 1,
oplogEntryPointers->end(),
- [&](const OplogEntry* nextEntry) {
- return nextEntry->opType[0] != 'i' || // Must be an insert.
- nextEntry->ns != entry->ns || // Must be the same namespace.
- // Must not create too large an object.
- (batchSize += nextEntry->o.Obj().objsize()) > insertVectorMaxBytes ||
- ++batchCount >= 64; // Or have too many entries.
+ [&](const OplogEntry* nextEntry) -> bool {
+ auto opNamespace = nextEntry->ns;
+ batchSize += nextEntry->o.Obj().objsize();
+ batchCount += 1;
+
+ // Only add the op to this batch if it passes the criteria.
+ return nextEntry->opType[0] != 'i' // Must be an insert.
+ || opNamespace != batchNamespace // Must be in the same namespace.
+ || batchSize > maxBatchSize // Must not create too large an object.
+ || batchCount > maxBatchCount; // Limit number of ops in a single group.
});
- if (endOfGroupableOpsIterator != oplogEntriesIterator + 1) {
+ // See if we were able to create a group that contains more than a single op.
+ bool isGroup = (endOfGroupableOpsIterator > oplogEntriesIterator + 1);
+
+ if (isGroup) {
// Since we found more than one document, create grouped insert of many docs.
BSONObjBuilder groupedInsertBuilder;
// Generate an op object of all elements except for "o", since we need to
@@ -1115,7 +1146,7 @@ Status multiSyncApply_noAbort(OperationContext* txn,
}
}
- // Populate the "o" field with all the groupable inserts.
+ // Populate the "o" field with an array of all the grouped inserts.
BSONArrayBuilder insertArrayBuilder(groupedInsertBuilder.subarrayStart("o"));
for (auto groupingIterator = oplogEntriesIterator;
groupingIterator != endOfGroupableOpsIterator;
@@ -1145,6 +1176,7 @@ Status multiSyncApply_noAbort(OperationContext* txn,
}
}
+ // If we didn't create a group, try to apply the op individually.
try {
// Apply an individual (non-grouped) op.
const Status status = syncApply(txn, entry->raw, inSteadyStateReplication);
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 1ea383ff9b3..30f0eeacad2 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -751,7 +751,7 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin
ASSERT_BSONOBJ_EQ(insertOp2b.o.Obj(), group2[1].Obj());
}
-TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) {
+TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchCountWhenGroupingInsertOperation) {
int seconds = 0;
auto makeOp = [&seconds](const NamespaceString& nss) {
return makeInsertDocumentOplogEntry(
@@ -802,6 +802,138 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) {
ASSERT_EQUALS(insertOps.back(), operationsApplied[2]);
}
+// Create an 'insert' oplog operation of an approximate size in bytes. The '_id' of the oplog entry
+// and its optime in seconds are given by the 'id' argument.
+OplogEntry makeSizedInsertOp(const NamespaceString& nss, int size, int id) {
+ return makeInsertDocumentOplogEntry({Timestamp(Seconds(id), 0), 1LL},
+ nss,
+ BSON("_id" << id << "data" << std::string(size, '*')));
+};
+
+TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchSizeWhenGroupingInsertOperations) {
+
+ int seconds = 0;
+ NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss);
+
+ // Create a sequence of insert ops that are too large to fit in one group.
+ int maxBatchSize = insertVectorMaxBytes;
+ int opsPerBatch = 3;
+ int opSize = maxBatchSize / opsPerBatch - 500; // Leave some room for other oplog fields.
+
+ // Create the insert ops.
+ MultiApplier::Operations insertOps;
+ int numOps = 4;
+ for (int i = 0; i < numOps; i++) {
+ insertOps.push_back(makeSizedInsertOp(nss, opSize, seconds++));
+ }
+
+ MultiApplier::Operations operationsToApply;
+ operationsToApply.push_back(createOp);
+ std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply));
+
+ MultiApplier::OperationPtrs ops;
+ for (auto&& op : operationsToApply) {
+ ops.push_back(&op);
+ }
+
+ std::vector<BSONObj> operationsApplied;
+ auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
+ operationsApplied.push_back(op.copy());
+ return Status::OK();
+ };
+
+ // Apply the ops.
+ ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply));
+
+ // Applied ops should be as follows:
+ // [ {create}, INSERT_GROUP{insert 1, insert 2, insert 3}, {insert 4} ]
+ ASSERT_EQ(3U, operationsApplied.size());
+ auto groupedInsertOp = operationsApplied[1];
+ ASSERT_EQUALS(BSONType::Array, groupedInsertOp["o"].type());
+ // Make sure the insert group was created correctly.
+ for (int i = 0; i < opsPerBatch; ++i) {
+ auto groupedInsertOpArray = groupedInsertOp["o"].Array();
+ ASSERT_BSONOBJ_EQ(insertOps[i].o.Obj(), groupedInsertOpArray[i].Obj());
+ }
+
+ // Check that the last op was applied individually.
+ ASSERT_BSONOBJ_EQ(insertOps[3].raw, operationsApplied[2]);
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyAppliesOpIndividuallyWhenOpIndividuallyExceedsBatchSize) {
+
+ int seconds = 0;
+ NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss);
+
+ int maxBatchSize = insertVectorMaxBytes;
+ // Create an insert op that exceeds the maximum batch size by itself.
+ auto insertOpLarge = makeSizedInsertOp(nss, maxBatchSize, seconds++);
+ auto insertOpSmall = makeSizedInsertOp(nss, 100, seconds++);
+
+ MultiApplier::Operations operationsToApply = {createOp, insertOpLarge, insertOpSmall};
+
+ MultiApplier::OperationPtrs ops;
+ for (auto&& op : operationsToApply) {
+ ops.push_back(&op);
+ }
+
+ std::vector<BSONObj> operationsApplied;
+ auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
+ operationsApplied.push_back(op.copy());
+ return Status::OK();
+ };
+
+ // Apply the ops.
+ ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply));
+
+ // Applied ops should be as follows:
+ // [ {create}, {large insert} {small insert} ]
+ ASSERT_EQ(operationsToApply.size(), operationsApplied.size());
+ ASSERT_BSONOBJ_EQ(createOp.raw, operationsApplied[0]);
+ ASSERT_BSONOBJ_EQ(insertOpLarge.raw, operationsApplied[1]);
+ ASSERT_BSONOBJ_EQ(insertOpSmall.raw, operationsApplied[2]);
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyAppliesInsertOpsIndividuallyWhenUnableToCreateGroupByNamespace) {
+
+ int seconds = 0;
+ auto makeOp = [&seconds](const NamespaceString& nss) {
+ return makeInsertDocumentOplogEntry(
+ {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++));
+ };
+
+ auto testNs = "test." + _agent.getSuiteName() + "_" + _agent.getTestName();
+
+ // Create a sequence of 3 'insert' ops that can't be grouped because they are from different
+ // namespaces.
+ MultiApplier::Operations operationsToApply = {makeOp(NamespaceString(testNs + "_1")),
+ makeOp(NamespaceString(testNs + "_2")),
+ makeOp(NamespaceString(testNs + "_3"))};
+
+ std::vector<BSONObj> operationsApplied;
+ auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
+ operationsApplied.push_back(op.copy());
+ return Status::OK();
+ };
+
+ MultiApplier::OperationPtrs ops;
+ for (auto&& op : operationsToApply) {
+ ops.push_back(&op);
+ }
+
+ // Apply the ops.
+ ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply));
+
+ // Applied ops should be as follows i.e. no insert grouping:
+ // [{insert 1}, {insert 2}, {insert 3}]
+ ASSERT_EQ(operationsToApply.size(), operationsApplied.size());
+ for (std::size_t i = 0; i < operationsToApply.size(); i++) {
+ ASSERT_BSONOBJ_EQ(operationsToApply[i].raw, operationsApplied[i]);
+ }
+}
+
TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) {
int seconds = 0;
auto makeOp = [&seconds](const NamespaceString& nss) {