From 98b5cf70786fa1e59e1d7dd2bc18ced1ebba827c Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Fri, 3 Jun 2016 11:52:15 -0400 Subject: added unit tests for repl::multiSyncApply --- src/mongo/db/repl/sync_tail.cpp | 68 +++++----- src/mongo/db/repl/sync_tail.h | 11 ++ src/mongo/db/repl/sync_tail_test.cpp | 238 ++++++++++++++++++++++++++++++++++- 3 files changed, 284 insertions(+), 33 deletions(-) diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 6e9ceabc098..1e01f0d11cf 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1006,8 +1006,24 @@ static void initializeWriterThread() { // This free function is used by the writer threads to apply each op void multiSyncApply(const std::vector& ops, SyncTail*) { - std::vector oplogEntries(ops.begin(), ops.end()); - std::vector oplogEntryPointers(oplogEntries.size()); + initializeWriterThread(); + auto txn = cc().makeOperationContext(); + auto syncApply = [](OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) { + return SyncTail::syncApply(txn, op, convertUpdateToUpsert); + }; + fassertNoTrace(16359, multiSyncApply_noAbort(txn.get(), ops, syncApply)); +} + +Status multiSyncApply_noAbort(OperationContext* txn, + const std::vector& oplogEntries, + SyncApplyFn syncApply) { + txn->setReplicatedWrites(false); + DisableDocumentValidation validationDisabler(txn); + + // allow us to get through the magic barrier + txn->lockState()->setIsBatchWriter(true); + + std::vector oplogEntryPointers(oplogEntries.size()); for (size_t i = 0; i < oplogEntries.size(); i++) { oplogEntryPointers[i] = &oplogEntries[i]; } @@ -1015,27 +1031,18 @@ void multiSyncApply(const std::vector& ops, SyncTail*) { if (oplogEntryPointers.size() > 1) { std::stable_sort(oplogEntryPointers.begin(), oplogEntryPointers.end(), - [](OplogEntry* l, OplogEntry* r) { return l->ns < r->ns; }); + [](const OplogEntry* l, const OplogEntry* r) { return l->ns < r->ns; }); } - initializeWriterThread(); - - const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); - OperationContext& txn = *txnPtr; - txn.setReplicatedWrites(false); - DisableDocumentValidation validationDisabler(&txn); - - // allow us to get through the magic barrier - txn.lockState()->setIsBatchWriter(true); bool convertUpdatesToUpserts = true; // doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op // of a failed group and not allowing further group inserts until that op has been processed. - std::vector::iterator doNotGroupBeforePoint = oplogEntryPointers.begin(); + auto doNotGroupBeforePoint = oplogEntryPointers.begin(); - for (std::vector::iterator oplogEntriesIterator = oplogEntryPointers.begin(); + for (auto oplogEntriesIterator = oplogEntryPointers.begin(); oplogEntriesIterator != oplogEntryPointers.end(); ++oplogEntriesIterator) { - OplogEntry* entry = *oplogEntriesIterator; + auto entry = *oplogEntriesIterator; if (entry->opType[0] == 'i' && !entry->isForCappedCollection && oplogEntriesIterator > doNotGroupBeforePoint) { // Attempt to group inserts if possible. @@ -1043,7 +1050,9 @@ void multiSyncApply(const std::vector& ops, SyncTail*) { int batchSize = 0; int batchCount = 0; auto endOfGroupableOpsIterator = std::find_if( - oplogEntriesIterator + 1, oplogEntryPointers.end(), [&](OplogEntry* nextEntry) { + oplogEntriesIterator + 1, + oplogEntryPointers.end(), + [&](const OplogEntry* nextEntry) { return nextEntry->opType[0] != 'i' || // Must be an insert. nextEntry->ns != entry->ns || // Must be the same namespace. // Must not create too large an object. @@ -1064,7 +1073,7 @@ void multiSyncApply(const std::vector& ops, SyncTail*) { // Populate the "o" field with all the groupable inserts. BSONArrayBuilder insertArrayBuilder(groupedInsertBuilder.subarrayStart("o")); - for (std::vector::iterator groupingIterator = oplogEntriesIterator; + for (auto groupingIterator = oplogEntriesIterator; groupingIterator != endOfGroupableOpsIterator; ++groupingIterator) { insertArrayBuilder.append((*groupingIterator)->o.Obj()); @@ -1073,8 +1082,8 @@ void multiSyncApply(const std::vector& ops, SyncTail*) { try { // Apply the group of inserts. - uassertStatusOK(SyncTail::syncApply( - &txn, groupedInsertBuilder.done(), convertUpdatesToUpserts)); + uassertStatusOK( + syncApply(txn, groupedInsertBuilder.done(), convertUpdatesToUpserts)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. oplogEntriesIterator = endOfGroupableOpsIterator - 1; @@ -1082,11 +1091,12 @@ void multiSyncApply(const std::vector& ops, SyncTail*) { } catch (const DBException& e) { // The group insert failed, log an error and fall through to the // application of an individual op. - error() << "Error applying inserts in bulk " << causedBy(e) - << " trying first insert as a lone insert"; - + str::stream msg; + msg << "Error applying inserts in bulk " << causedBy(e) + << " trying first insert as a lone insert"; + error() << std::string(msg); if (inShutdown()) { - return; + return {ErrorCodes::InterruptedAtShutdown, msg}; } // Avoid quadratic run time from failed insert by not retrying until we @@ -1098,26 +1108,28 @@ void multiSyncApply(const std::vector& ops, SyncTail*) { try { // Apply an individual (non-grouped) op. - const Status s = SyncTail::syncApply(&txn, entry->raw, convertUpdatesToUpserts); + const Status s = syncApply(txn, entry->raw, convertUpdatesToUpserts); if (!s.isOK()) { severe() << "Error applying operation (" << entry->raw.toString() << "): " << s; if (inShutdown()) { - return; + return {ErrorCodes::InterruptedAtShutdown, s.toString()}; } - fassertFailedNoTrace(16359); + return s; } } catch (const DBException& e) { severe() << "writer worker caught exception: " << causedBy(e) << " on: " << entry->raw.toString(); if (inShutdown()) { - return; + return {ErrorCodes::InterruptedAtShutdown, e.toString()}; } - fassertFailedNoTrace(16360); + return e.toStatus(); } } + + return Status::OK(); } // This free function is used by the initial sync writer threads to apply each op diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 204fce1f037..69cac0a3943 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -202,6 +202,17 @@ StatusWith multiApply(OperationContext* txn, void multiSyncApply(const std::vector& ops, SyncTail* st); void multiInitialSyncApply(const std::vector& ops, SyncTail* st); +/** + * Testing-only version of multiSyncApply that returns an error instead of aborting. + * Accepts an external operation context and a function with the same argument list as + * SyncTail::syncApply. + */ +using SyncApplyFn = + stdx::function; +Status multiSyncApply_noAbort(OperationContext* txn, + const std::vector& ops, + SyncApplyFn syncApply); + /** * Testing-only version of multiInitialSyncApply that accepts an external operation context and * returns an error instead of aborting. diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index ac3b6e1ddd2..81e4cc51ac1 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -111,6 +111,7 @@ void SyncTailTest::setUp() { Client::initThreadIfNotAlready(); _txn = cc().makeOperationContext(); + _txn->lockState()->setIsBatchWriter(false); _opsApplied = 0; _applyOp = [](OperationContext* txn, Database* db, @@ -191,7 +192,8 @@ OplogEntry makeCreateCollectionOplogEntry(OpTime opTime, bob.appendElements(opTime.toBSON()); bob.append("h", 1LL); bob.append("op", "c"); - bob.append("ns", nss.ns()); + bob.append("ns", nss.getCommandNS()); + bob.append("o", BSON("create" << nss.coll())); return OplogEntry(bob.obj()); } @@ -551,8 +553,8 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH operationsApplied.push_back(operationsForWriterThreadToApply); }; - auto op1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1); - auto op2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2); + auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1, BSON("x" << 1)); + auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2, BSON("x" << 2)); auto lastOpTime = unittest::assertGet(multiApply(_txn.get(), &writerPool, {op1, op2}, applyOperationFn)); @@ -580,6 +582,234 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH ASSERT_EQUALS(op2, operationsWritternToOplog[1]); } +TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + ASSERT_TRUE(_txn->writesAreReplicated()); + ASSERT_FALSE(documentValidationDisabled(_txn.get())); + ASSERT_FALSE(_txn->lockState()->isBatchWriter()); + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); + _txn.reset(); + multiSyncApply({op}, nullptr); + // Collection should be created after SyncTail::syncApply() processes operation. + _txn = cc().makeOperationContext(); + ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection()); +} + +TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + ASSERT_TRUE(_txn->writesAreReplicated()); + ASSERT_FALSE(documentValidationDisabled(_txn.get())); + ASSERT_FALSE(_txn->lockState()->isBatchWriter()); + auto syncApply = [](OperationContext* txn, const BSONObj&, bool convertUpdatesToUpserts) { + ASSERT_FALSE(txn->writesAreReplicated()); + ASSERT_TRUE(txn->lockState()->isBatchWriter()); + ASSERT_TRUE(documentValidationDisabled(txn)); + ASSERT_TRUE(convertUpdatesToUpserts); + return Status::OK(); + }; + auto op = makeUpdateDocumentOplogEntry( + {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); + ASSERT_OK(multiSyncApply_noAbort(_txn.get(), {op}, syncApply)); +} + +TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToApplyOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + OplogEntry op(BSON("op" + << "x" + << "ns" + << nss.ns())); + auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status { + return {ErrorCodes::OperationFailed, ""}; + }; + ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_txn.get(), {op}, syncApply)); +} + +TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + OplogEntry op(BSON("op" + << "x" + << "ns" + << nss.ns())); + auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status { + uasserted(ErrorCodes::OperationFailed, ""); + MONGO_UNREACHABLE; + }; + ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_txn.get(), {op}, syncApply)); +} + +TEST_F(SyncTailTest, MultiSyncApplySortsOperationsStablyByNamespaceBeforeApplying) { + int x = 0; + auto makeOp = [&x](const char* ns) -> OplogEntry { + return OplogEntry(BSON("op" + << "x" + << "ns" + << ns + << "x" + << x++)); + }; + auto op1 = makeOp("test.t1"); + auto op2 = makeOp("test.t1"); + auto op3 = makeOp("test.t2"); + auto op4 = makeOp("test.t3"); + MultiApplier::Operations operationsApplied; + auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { + operationsApplied.push_back(OplogEntry(op)); + return Status::OK(); + }; + ASSERT_OK(multiSyncApply_noAbort(_txn.get(), {op4, op1, op3, op2}, syncApply)); + ASSERT_EQUALS(4U, operationsApplied.size()); + ASSERT_EQUALS(op1, operationsApplied[0]); + ASSERT_EQUALS(op2, operationsApplied[1]); + ASSERT_EQUALS(op3, operationsApplied[2]); + ASSERT_EQUALS(op4, operationsApplied[3]); +} + +TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplying) { + int seconds = 0; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + NamespaceString nss1("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); + NamespaceString nss2("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_2"); + auto createOp1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss1); + auto createOp2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss2); + auto insertOp1a = makeOp(nss1); + auto insertOp1b = makeOp(nss1); + auto insertOp2a = makeOp(nss2); + auto insertOp2b = makeOp(nss2); + MultiApplier::Operations operationsApplied; + auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { + operationsApplied.push_back(OplogEntry(op)); + return Status::OK(); + }; + + ASSERT_OK(multiSyncApply_noAbort( + _txn.get(), + {createOp1, createOp2, insertOp1a, insertOp2a, insertOp1b, insertOp2b}, + syncApply)); + + ASSERT_EQUALS(4U, operationsApplied.size()); + ASSERT_EQUALS(createOp1, operationsApplied[0]); + ASSERT_EQUALS(createOp2, operationsApplied[1]); + + // Check grouped insert operations in namespace "nss1". + ASSERT_EQUALS(insertOp1a.getOpTime(), operationsApplied[2].getOpTime()); + ASSERT_EQUALS(insertOp1a.ns, operationsApplied[2].ns); + ASSERT_EQUALS(BSONType::Array, operationsApplied[2].o.type()); + auto group1 = operationsApplied[2].o.Array(); + ASSERT_EQUALS(2U, group1.size()); + ASSERT_EQUALS(insertOp1a.o.Obj(), group1[0].Obj()); + ASSERT_EQUALS(insertOp1b.o.Obj(), group1[1].Obj()); + + // Check grouped insert operations in namespace "nss2". + ASSERT_EQUALS(insertOp2a.getOpTime(), operationsApplied[3].getOpTime()); + ASSERT_EQUALS(insertOp2a.ns, operationsApplied[3].ns); + ASSERT_EQUALS(BSONType::Array, operationsApplied[3].o.type()); + auto group2 = operationsApplied[3].o.Array(); + ASSERT_EQUALS(2U, group2.size()); + ASSERT_EQUALS(insertOp2a.o.Obj(), group2[0].Obj()); + ASSERT_EQUALS(insertOp2b.o.Obj(), group2[1].Obj()); +} + +TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) { + int seconds = 0; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + // Generate operations to apply: + // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} + std::size_t limit = 64; + MultiApplier::Operations insertOps; + for (std::size_t i = 0; i < limit + 1; ++i) { + insertOps.push_back(makeOp(nss)); + } + MultiApplier::Operations operationsToApply; + operationsToApply.push_back(createOp); + std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); + MultiApplier::Operations operationsApplied; + auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { + operationsApplied.push_back(OplogEntry(op)); + return Status::OK(); + }; + + ASSERT_OK(multiSyncApply_noAbort(_txn.get(), operationsToApply, syncApply)); + + // multiSyncApply should combine operations as follows: + // {create}, {grouped_insert}, {insert_(limit+1)} + ASSERT_EQUALS(3U, operationsApplied.size()); + ASSERT_EQUALS(createOp, operationsApplied[0]); + + const auto& groupedInsertOp = operationsApplied[1]; + ASSERT_EQUALS(insertOps.front().getOpTime(), groupedInsertOp.getOpTime()); + ASSERT_EQUALS(insertOps.front().ns, groupedInsertOp.ns); + ASSERT_EQUALS(BSONType::Array, groupedInsertOp.o.type()); + auto groupedInsertDocuments = groupedInsertOp.o.Array(); + ASSERT_EQUALS(limit, groupedInsertDocuments.size()); + for (std::size_t i = 0; i < limit; ++i) { + const auto& insertOp = insertOps[i]; + ASSERT_EQUALS(insertOp.o.Obj(), groupedInsertDocuments[i].Obj()); + } + + // (limit + 1)-th insert operations should not be included in group of first (limit) inserts. + ASSERT_EQUALS(insertOps.back(), operationsApplied[2]); +} + +TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) { + int seconds = 0; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + // Generate operations to apply: + // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} + std::size_t limit = 64; + MultiApplier::Operations insertOps; + for (std::size_t i = 0; i < limit + 1; ++i) { + insertOps.push_back(makeOp(nss)); + } + MultiApplier::Operations operationsToApply; + operationsToApply.push_back(createOp); + std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); + + std::size_t numFailedGroupedInserts = 0; + MultiApplier::Operations operationsApplied; + auto syncApply = [&numFailedGroupedInserts, + &operationsApplied](OperationContext*, const BSONObj& op, bool) -> Status { + // Reject grouped insert operations. + if (op["o"].type() == BSONType::Array) { + numFailedGroupedInserts++; + return {ErrorCodes::OperationFailed, "grouped inserts not supported"}; + } + operationsApplied.push_back(OplogEntry(op)); + return Status::OK(); + }; + + ASSERT_OK(multiSyncApply_noAbort(_txn.get(), operationsToApply, syncApply)); + + // On failing to apply the grouped insert operation, multiSyncApply should apply the operations + // as given in "operationsToApply": + // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} + ASSERT_EQUALS(limit + 2, operationsApplied.size()); + ASSERT_EQUALS(createOp, operationsApplied[0]); + + for (std::size_t i = 0; i < limit + 1; ++i) { + const auto& insertOp = insertOps[i]; + ASSERT_EQUALS(insertOp, operationsApplied[i + 1]); + } + + // Ensure that multiSyncApply does not attempt to group remaining operations in first failed + // grouped insert operation. + ASSERT_EQUALS(1U, numFailedGroupedInserts); +} + TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyingOperations) { SyncTailWithOperationContextChecker syncTail; NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); @@ -625,7 +855,6 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyRetriesFailedUpdateIfDocumentIsAvailab TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughSyncApplyErrorAfterFailingToRetryBadOp) { SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1)); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - auto updatedDocument = BSON("_id" << 0 << "x" << 2); OplogEntry op(BSON("op" << "x" << "ns" @@ -636,7 +865,6 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughSyncApplyErrorAfterFailin TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughShouldSyncTailRetryError) { SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - auto updatedDocument = BSON("_id" << 0 << "x" << 2); auto op = makeUpdateDocumentOplogEntry( {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); ASSERT_THROWS_CODE( -- cgit v1.2.1