From aadff4eb0b8b051777a6e4315e91dde83369e2cd Mon Sep 17 00:00:00 2001 From: William Schultz Date: Thu, 13 Jul 2017 18:22:51 -0400 Subject: SERVER-29282 Prevent BSON Document size from being exceeded when grouping inserts on SECONDARY (cherry picked from commit 1da62c11258aaa91dcff3f0133775aae615e29d4) --- src/mongo/db/repl/sync_tail.cpp | 54 +++++++++++--- src/mongo/db/repl/sync_tail_test.cpp | 134 ++++++++++++++++++++++++++++++++++- 2 files changed, 176 insertions(+), 12 deletions(-) diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 8c0c0229508..30f1e28b619 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1070,6 +1070,8 @@ Status multiSyncApply_noAbort(OperationContext* txn, // allow us to get through the magic barrier txn->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + // Sort the oplog entries by namespace, so that entries from the same namespace will be next to + // each other in the list. if (oplogEntryPointers->size() > 1) { std::stable_sort(oplogEntryPointers->begin(), oplogEntryPointers->end(), @@ -1086,25 +1088,54 @@ Status multiSyncApply_noAbort(OperationContext* txn, for (auto oplogEntriesIterator = oplogEntryPointers->begin(); oplogEntriesIterator != oplogEntryPointers->end(); ++oplogEntriesIterator) { + auto entry = *oplogEntriesIterator; if (entry->opType[0] == 'i' && !entry->isForCappedCollection && oplogEntriesIterator > doNotGroupBeforePoint) { - // Attempt to group inserts if possible. + std::vector toInsert; - int batchSize = 0; - int batchCount = 0; + + auto maxBatchSize = insertVectorMaxBytes; + auto maxBatchCount = 64; + + // Make sure to include the first op in the batch size. + int batchSize = (*oplogEntriesIterator)->o.Obj().objsize(); + int batchCount = 1; + auto batchNamespace = entry->ns; + + /** + * Search for the op that delimits this insert batch, and save its position + * in endOfGroupableOpsIterator. For example, given the following list of oplog + * entries with a sequence of groupable inserts: + * + * S--------------E + * u, u, u, i, i, i, i, i, d, d + * + * S: start of insert group + * E: end of groupable ops + * + * E is the position of endOfGroupableOpsIterator. i.e. endOfGroupableOpsIterator + * will point to the first op that *can't* be added to the current insert group. + */ auto endOfGroupableOpsIterator = std::find_if( oplogEntriesIterator + 1, oplogEntryPointers->end(), - [&](const OplogEntry* nextEntry) { - return nextEntry->opType[0] != 'i' || // Must be an insert. - nextEntry->ns != entry->ns || // Must be the same namespace. - // Must not create too large an object. - (batchSize += nextEntry->o.Obj().objsize()) > insertVectorMaxBytes || - ++batchCount >= 64; // Or have too many entries. + [&](const OplogEntry* nextEntry) -> bool { + auto opNamespace = nextEntry->ns; + batchSize += nextEntry->o.Obj().objsize(); + batchCount += 1; + + // Only add the op to this batch if it passes the criteria. + return nextEntry->opType[0] != 'i' // Must be an insert. + || opNamespace != batchNamespace // Must be in the same namespace. + || batchSize > maxBatchSize // Must not create too large an object. + || batchCount > maxBatchCount; // Limit number of ops in a single group. }); - if (endOfGroupableOpsIterator != oplogEntriesIterator + 1) { + // See if we were able to create a group that contains more than a single op. + bool isGroup = (endOfGroupableOpsIterator > oplogEntriesIterator + 1); + + if (isGroup) { // Since we found more than one document, create grouped insert of many docs. BSONObjBuilder groupedInsertBuilder; // Generate an op object of all elements except for "o", since we need to @@ -1115,7 +1146,7 @@ Status multiSyncApply_noAbort(OperationContext* txn, } } - // Populate the "o" field with all the groupable inserts. + // Populate the "o" field with an array of all the grouped inserts. BSONArrayBuilder insertArrayBuilder(groupedInsertBuilder.subarrayStart("o")); for (auto groupingIterator = oplogEntriesIterator; groupingIterator != endOfGroupableOpsIterator; @@ -1145,6 +1176,7 @@ Status multiSyncApply_noAbort(OperationContext* txn, } } + // If we didn't create a group, try to apply the op individually. try { // Apply an individual (non-grouped) op. const Status status = syncApply(txn, entry->raw, inSteadyStateReplication); diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 1ea383ff9b3..30f0eeacad2 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -751,7 +751,7 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin ASSERT_BSONOBJ_EQ(insertOp2b.o.Obj(), group2[1].Obj()); } -TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) { +TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchCountWhenGroupingInsertOperation) { int seconds = 0; auto makeOp = [&seconds](const NamespaceString& nss) { return makeInsertDocumentOplogEntry( @@ -802,6 +802,138 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) { ASSERT_EQUALS(insertOps.back(), operationsApplied[2]); } +// Create an 'insert' oplog operation of an approximate size in bytes. The '_id' of the oplog entry +// and its optime in seconds are given by the 'id' argument. +OplogEntry makeSizedInsertOp(const NamespaceString& nss, int size, int id) { + return makeInsertDocumentOplogEntry({Timestamp(Seconds(id), 0), 1LL}, + nss, + BSON("_id" << id << "data" << std::string(size, '*'))); +}; + +TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchSizeWhenGroupingInsertOperations) { + + int seconds = 0; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName()); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + // Create a sequence of insert ops that are too large to fit in one group. + int maxBatchSize = insertVectorMaxBytes; + int opsPerBatch = 3; + int opSize = maxBatchSize / opsPerBatch - 500; // Leave some room for other oplog fields. + + // Create the insert ops. + MultiApplier::Operations insertOps; + int numOps = 4; + for (int i = 0; i < numOps; i++) { + insertOps.push_back(makeSizedInsertOp(nss, opSize, seconds++)); + } + + MultiApplier::Operations operationsToApply; + operationsToApply.push_back(createOp); + std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); + + MultiApplier::OperationPtrs ops; + for (auto&& op : operationsToApply) { + ops.push_back(&op); + } + + std::vector operationsApplied; + auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { + operationsApplied.push_back(op.copy()); + return Status::OK(); + }; + + // Apply the ops. + ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply)); + + // Applied ops should be as follows: + // [ {create}, INSERT_GROUP{insert 1, insert 2, insert 3}, {insert 4} ] + ASSERT_EQ(3U, operationsApplied.size()); + auto groupedInsertOp = operationsApplied[1]; + ASSERT_EQUALS(BSONType::Array, groupedInsertOp["o"].type()); + // Make sure the insert group was created correctly. + for (int i = 0; i < opsPerBatch; ++i) { + auto groupedInsertOpArray = groupedInsertOp["o"].Array(); + ASSERT_BSONOBJ_EQ(insertOps[i].o.Obj(), groupedInsertOpArray[i].Obj()); + } + + // Check that the last op was applied individually. + ASSERT_BSONOBJ_EQ(insertOps[3].raw, operationsApplied[2]); +} + +TEST_F(SyncTailTest, MultiSyncApplyAppliesOpIndividuallyWhenOpIndividuallyExceedsBatchSize) { + + int seconds = 0; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName()); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + int maxBatchSize = insertVectorMaxBytes; + // Create an insert op that exceeds the maximum batch size by itself. + auto insertOpLarge = makeSizedInsertOp(nss, maxBatchSize, seconds++); + auto insertOpSmall = makeSizedInsertOp(nss, 100, seconds++); + + MultiApplier::Operations operationsToApply = {createOp, insertOpLarge, insertOpSmall}; + + MultiApplier::OperationPtrs ops; + for (auto&& op : operationsToApply) { + ops.push_back(&op); + } + + std::vector operationsApplied; + auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { + operationsApplied.push_back(op.copy()); + return Status::OK(); + }; + + // Apply the ops. + ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply)); + + // Applied ops should be as follows: + // [ {create}, {large insert} {small insert} ] + ASSERT_EQ(operationsToApply.size(), operationsApplied.size()); + ASSERT_BSONOBJ_EQ(createOp.raw, operationsApplied[0]); + ASSERT_BSONOBJ_EQ(insertOpLarge.raw, operationsApplied[1]); + ASSERT_BSONOBJ_EQ(insertOpSmall.raw, operationsApplied[2]); +} + +TEST_F(SyncTailTest, MultiSyncApplyAppliesInsertOpsIndividuallyWhenUnableToCreateGroupByNamespace) { + + int seconds = 0; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + + auto testNs = "test." + _agent.getSuiteName() + "_" + _agent.getTestName(); + + // Create a sequence of 3 'insert' ops that can't be grouped because they are from different + // namespaces. + MultiApplier::Operations operationsToApply = {makeOp(NamespaceString(testNs + "_1")), + makeOp(NamespaceString(testNs + "_2")), + makeOp(NamespaceString(testNs + "_3"))}; + + std::vector operationsApplied; + auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { + operationsApplied.push_back(op.copy()); + return Status::OK(); + }; + + MultiApplier::OperationPtrs ops; + for (auto&& op : operationsToApply) { + ops.push_back(&op); + } + + // Apply the ops. + ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply)); + + // Applied ops should be as follows i.e. no insert grouping: + // [{insert 1}, {insert 2}, {insert 3}] + ASSERT_EQ(operationsToApply.size(), operationsApplied.size()); + for (std::size_t i = 0; i < operationsToApply.size(); i++) { + ASSERT_BSONOBJ_EQ(operationsToApply[i].raw, operationsApplied[i]); + } +} + TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) { int seconds = 0; auto makeOp = [&seconds](const NamespaceString& nss) { -- cgit v1.2.1