diff options
author | Benety Goh <benety@mongodb.com> | 2016-06-03 15:34:04 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-06-03 15:34:04 -0400 |
commit | f9d26f5f8034375236f0c53fa10cf0f5d049d76c (patch) | |
tree | 47a571aed5748a145a41aafe949ca386207ffdb0 /src | |
parent | 98b5cf70786fa1e59e1d7dd2bc18ced1ebba827c (diff) | |
download | mongo-f9d26f5f8034375236f0c53fa10cf0f5d049d76c.tar.gz |
Revert "added unit tests for repl::multiSyncApply"
This reverts commit 98b5cf70786fa1e59e1d7dd2bc18ced1ebba827c.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 238 |
3 files changed, 33 insertions, 284 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 1e01f0d11cf..6e9ceabc098 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1006,24 +1006,8 @@ static void initializeWriterThread() { // This free function is used by the writer threads to apply each op void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) { - initializeWriterThread(); - auto txn = cc().makeOperationContext(); - auto syncApply = [](OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) { - return SyncTail::syncApply(txn, op, convertUpdateToUpsert); - }; - fassertNoTrace(16359, multiSyncApply_noAbort(txn.get(), ops, syncApply)); -} - -Status multiSyncApply_noAbort(OperationContext* txn, - const std::vector<OplogEntry>& oplogEntries, - SyncApplyFn syncApply) { - txn->setReplicatedWrites(false); - DisableDocumentValidation validationDisabler(txn); - - // allow us to get through the magic barrier - txn->lockState()->setIsBatchWriter(true); - - std::vector<const OplogEntry*> oplogEntryPointers(oplogEntries.size()); + std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end()); + std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size()); for (size_t i = 0; i < oplogEntries.size(); i++) { oplogEntryPointers[i] = &oplogEntries[i]; } @@ -1031,18 +1015,27 @@ Status multiSyncApply_noAbort(OperationContext* txn, if (oplogEntryPointers.size() > 1) { std::stable_sort(oplogEntryPointers.begin(), oplogEntryPointers.end(), - [](const OplogEntry* l, const OplogEntry* r) { return l->ns < r->ns; }); + [](OplogEntry* l, OplogEntry* r) { return l->ns < r->ns; }); } + initializeWriterThread(); + + const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); + OperationContext& txn = *txnPtr; + txn.setReplicatedWrites(false); + DisableDocumentValidation validationDisabler(&txn); + + // allow us to get through the magic barrier + txn.lockState()->setIsBatchWriter(true); bool convertUpdatesToUpserts = true; // doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op // of a failed group and not allowing further group inserts until that op has been processed. - auto doNotGroupBeforePoint = oplogEntryPointers.begin(); + std::vector<OplogEntry*>::iterator doNotGroupBeforePoint = oplogEntryPointers.begin(); - for (auto oplogEntriesIterator = oplogEntryPointers.begin(); + for (std::vector<OplogEntry*>::iterator oplogEntriesIterator = oplogEntryPointers.begin(); oplogEntriesIterator != oplogEntryPointers.end(); ++oplogEntriesIterator) { - auto entry = *oplogEntriesIterator; + OplogEntry* entry = *oplogEntriesIterator; if (entry->opType[0] == 'i' && !entry->isForCappedCollection && oplogEntriesIterator > doNotGroupBeforePoint) { // Attempt to group inserts if possible. @@ -1050,9 +1043,7 @@ Status multiSyncApply_noAbort(OperationContext* txn, int batchSize = 0; int batchCount = 0; auto endOfGroupableOpsIterator = std::find_if( - oplogEntriesIterator + 1, - oplogEntryPointers.end(), - [&](const OplogEntry* nextEntry) { + oplogEntriesIterator + 1, oplogEntryPointers.end(), [&](OplogEntry* nextEntry) { return nextEntry->opType[0] != 'i' || // Must be an insert. nextEntry->ns != entry->ns || // Must be the same namespace. // Must not create too large an object. @@ -1073,7 +1064,7 @@ Status multiSyncApply_noAbort(OperationContext* txn, // Populate the "o" field with all the groupable inserts. BSONArrayBuilder insertArrayBuilder(groupedInsertBuilder.subarrayStart("o")); - for (auto groupingIterator = oplogEntriesIterator; + for (std::vector<OplogEntry*>::iterator groupingIterator = oplogEntriesIterator; groupingIterator != endOfGroupableOpsIterator; ++groupingIterator) { insertArrayBuilder.append((*groupingIterator)->o.Obj()); @@ -1082,8 +1073,8 @@ Status multiSyncApply_noAbort(OperationContext* txn, try { // Apply the group of inserts. - uassertStatusOK( - syncApply(txn, groupedInsertBuilder.done(), convertUpdatesToUpserts)); + uassertStatusOK(SyncTail::syncApply( + &txn, groupedInsertBuilder.done(), convertUpdatesToUpserts)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. oplogEntriesIterator = endOfGroupableOpsIterator - 1; @@ -1091,12 +1082,11 @@ Status multiSyncApply_noAbort(OperationContext* txn, } catch (const DBException& e) { // The group insert failed, log an error and fall through to the // application of an individual op. - str::stream msg; - msg << "Error applying inserts in bulk " << causedBy(e) - << " trying first insert as a lone insert"; - error() << std::string(msg); + error() << "Error applying inserts in bulk " << causedBy(e) + << " trying first insert as a lone insert"; + if (inShutdown()) { - return {ErrorCodes::InterruptedAtShutdown, msg}; + return; } // Avoid quadratic run time from failed insert by not retrying until we @@ -1108,28 +1098,26 @@ Status multiSyncApply_noAbort(OperationContext* txn, try { // Apply an individual (non-grouped) op. - const Status s = syncApply(txn, entry->raw, convertUpdatesToUpserts); + const Status s = SyncTail::syncApply(&txn, entry->raw, convertUpdatesToUpserts); if (!s.isOK()) { severe() << "Error applying operation (" << entry->raw.toString() << "): " << s; if (inShutdown()) { - return {ErrorCodes::InterruptedAtShutdown, s.toString()}; + return; } - return s; + fassertFailedNoTrace(16359); } } catch (const DBException& e) { severe() << "writer worker caught exception: " << causedBy(e) << " on: " << entry->raw.toString(); if (inShutdown()) { - return {ErrorCodes::InterruptedAtShutdown, e.toString()}; + return; } - return e.toStatus(); + fassertFailedNoTrace(16360); } } - - return Status::OK(); } // This free function is used by the initial sync writer threads to apply each op diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 69cac0a3943..204fce1f037 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -203,17 +203,6 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st); void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st); /** - * Testing-only version of multiSyncApply that returns an error instead of aborting. - * Accepts an external operation context and a function with the same argument list as - * SyncTail::syncApply. - */ -using SyncApplyFn = - stdx::function<Status(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert)>; -Status multiSyncApply_noAbort(OperationContext* txn, - const std::vector<OplogEntry>& ops, - SyncApplyFn syncApply); - -/** * Testing-only version of multiInitialSyncApply that accepts an external operation context and * returns an error instead of aborting. */ diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 81e4cc51ac1..ac3b6e1ddd2 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -111,7 +111,6 @@ void SyncTailTest::setUp() { Client::initThreadIfNotAlready(); _txn = cc().makeOperationContext(); - _txn->lockState()->setIsBatchWriter(false); _opsApplied = 0; _applyOp = [](OperationContext* txn, Database* db, @@ -192,8 +191,7 @@ OplogEntry makeCreateCollectionOplogEntry(OpTime opTime, bob.appendElements(opTime.toBSON()); bob.append("h", 1LL); bob.append("op", "c"); - bob.append("ns", nss.getCommandNS()); - bob.append("o", BSON("create" << nss.coll())); + bob.append("ns", nss.ns()); return OplogEntry(bob.obj()); } @@ -553,8 +551,8 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH operationsApplied.push_back(operationsForWriterThreadToApply); }; - auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1, BSON("x" << 1)); - auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2, BSON("x" << 2)); + auto op1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1); + auto op2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2); auto lastOpTime = unittest::assertGet(multiApply(_txn.get(), &writerPool, {op1, op2}, applyOperationFn)); @@ -582,234 +580,6 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH ASSERT_EQUALS(op2, operationsWritternToOplog[1]); } -TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - ASSERT_TRUE(_txn->writesAreReplicated()); - ASSERT_FALSE(documentValidationDisabled(_txn.get())); - ASSERT_FALSE(_txn->lockState()->isBatchWriter()); - auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - _txn.reset(); - multiSyncApply({op}, nullptr); - // Collection should be created after SyncTail::syncApply() processes operation. - _txn = cc().makeOperationContext(); - ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection()); -} - -TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - ASSERT_TRUE(_txn->writesAreReplicated()); - ASSERT_FALSE(documentValidationDisabled(_txn.get())); - ASSERT_FALSE(_txn->lockState()->isBatchWriter()); - auto syncApply = [](OperationContext* txn, const BSONObj&, bool convertUpdatesToUpserts) { - ASSERT_FALSE(txn->writesAreReplicated()); - ASSERT_TRUE(txn->lockState()->isBatchWriter()); - ASSERT_TRUE(documentValidationDisabled(txn)); - ASSERT_TRUE(convertUpdatesToUpserts); - return Status::OK(); - }; - auto op = makeUpdateDocumentOplogEntry( - {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); - ASSERT_OK(multiSyncApply_noAbort(_txn.get(), {op}, syncApply)); -} - -TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToApplyOperation) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - OplogEntry op(BSON("op" - << "x" - << "ns" - << nss.ns())); - auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status { - return {ErrorCodes::OperationFailed, ""}; - }; - ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_txn.get(), {op}, syncApply)); -} - -TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - OplogEntry op(BSON("op" - << "x" - << "ns" - << nss.ns())); - auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status { - uasserted(ErrorCodes::OperationFailed, ""); - MONGO_UNREACHABLE; - }; - ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_txn.get(), {op}, syncApply)); -} - -TEST_F(SyncTailTest, MultiSyncApplySortsOperationsStablyByNamespaceBeforeApplying) { - int x = 0; - auto makeOp = [&x](const char* ns) -> OplogEntry { - return OplogEntry(BSON("op" - << "x" - << "ns" - << ns - << "x" - << x++)); - }; - auto op1 = makeOp("test.t1"); - auto op2 = makeOp("test.t1"); - auto op3 = makeOp("test.t2"); - auto op4 = makeOp("test.t3"); - MultiApplier::Operations operationsApplied; - auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(OplogEntry(op)); - return Status::OK(); - }; - ASSERT_OK(multiSyncApply_noAbort(_txn.get(), {op4, op1, op3, op2}, syncApply)); - ASSERT_EQUALS(4U, operationsApplied.size()); - ASSERT_EQUALS(op1, operationsApplied[0]); - ASSERT_EQUALS(op2, operationsApplied[1]); - ASSERT_EQUALS(op3, operationsApplied[2]); - ASSERT_EQUALS(op4, operationsApplied[3]); -} - -TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplying) { - int seconds = 0; - auto makeOp = [&seconds](const NamespaceString& nss) { - return makeInsertDocumentOplogEntry( - {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); - }; - NamespaceString nss1("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); - NamespaceString nss2("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_2"); - auto createOp1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss1); - auto createOp2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss2); - auto insertOp1a = makeOp(nss1); - auto insertOp1b = makeOp(nss1); - auto insertOp2a = makeOp(nss2); - auto insertOp2b = makeOp(nss2); - MultiApplier::Operations operationsApplied; - auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(OplogEntry(op)); - return Status::OK(); - }; - - ASSERT_OK(multiSyncApply_noAbort( - _txn.get(), - {createOp1, createOp2, insertOp1a, insertOp2a, insertOp1b, insertOp2b}, - syncApply)); - - ASSERT_EQUALS(4U, operationsApplied.size()); - ASSERT_EQUALS(createOp1, operationsApplied[0]); - ASSERT_EQUALS(createOp2, operationsApplied[1]); - - // Check grouped insert operations in namespace "nss1". - ASSERT_EQUALS(insertOp1a.getOpTime(), operationsApplied[2].getOpTime()); - ASSERT_EQUALS(insertOp1a.ns, operationsApplied[2].ns); - ASSERT_EQUALS(BSONType::Array, operationsApplied[2].o.type()); - auto group1 = operationsApplied[2].o.Array(); - ASSERT_EQUALS(2U, group1.size()); - ASSERT_EQUALS(insertOp1a.o.Obj(), group1[0].Obj()); - ASSERT_EQUALS(insertOp1b.o.Obj(), group1[1].Obj()); - - // Check grouped insert operations in namespace "nss2". - ASSERT_EQUALS(insertOp2a.getOpTime(), operationsApplied[3].getOpTime()); - ASSERT_EQUALS(insertOp2a.ns, operationsApplied[3].ns); - ASSERT_EQUALS(BSONType::Array, operationsApplied[3].o.type()); - auto group2 = operationsApplied[3].o.Array(); - ASSERT_EQUALS(2U, group2.size()); - ASSERT_EQUALS(insertOp2a.o.Obj(), group2[0].Obj()); - ASSERT_EQUALS(insertOp2b.o.Obj(), group2[1].Obj()); -} - -TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) { - int seconds = 0; - auto makeOp = [&seconds](const NamespaceString& nss) { - return makeInsertDocumentOplogEntry( - {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); - }; - NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); - auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); - - // Generate operations to apply: - // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} - std::size_t limit = 64; - MultiApplier::Operations insertOps; - for (std::size_t i = 0; i < limit + 1; ++i) { - insertOps.push_back(makeOp(nss)); - } - MultiApplier::Operations operationsToApply; - operationsToApply.push_back(createOp); - std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); - MultiApplier::Operations operationsApplied; - auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(OplogEntry(op)); - return Status::OK(); - }; - - ASSERT_OK(multiSyncApply_noAbort(_txn.get(), operationsToApply, syncApply)); - - // multiSyncApply should combine operations as follows: - // {create}, {grouped_insert}, {insert_(limit+1)} - ASSERT_EQUALS(3U, operationsApplied.size()); - ASSERT_EQUALS(createOp, operationsApplied[0]); - - const auto& groupedInsertOp = operationsApplied[1]; - ASSERT_EQUALS(insertOps.front().getOpTime(), groupedInsertOp.getOpTime()); - ASSERT_EQUALS(insertOps.front().ns, groupedInsertOp.ns); - ASSERT_EQUALS(BSONType::Array, groupedInsertOp.o.type()); - auto groupedInsertDocuments = groupedInsertOp.o.Array(); - ASSERT_EQUALS(limit, groupedInsertDocuments.size()); - for (std::size_t i = 0; i < limit; ++i) { - const auto& insertOp = insertOps[i]; - ASSERT_EQUALS(insertOp.o.Obj(), groupedInsertDocuments[i].Obj()); - } - - // (limit + 1)-th insert operations should not be included in group of first (limit) inserts. - ASSERT_EQUALS(insertOps.back(), operationsApplied[2]); -} - -TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) { - int seconds = 0; - auto makeOp = [&seconds](const NamespaceString& nss) { - return makeInsertDocumentOplogEntry( - {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); - }; - NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); - auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); - - // Generate operations to apply: - // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} - std::size_t limit = 64; - MultiApplier::Operations insertOps; - for (std::size_t i = 0; i < limit + 1; ++i) { - insertOps.push_back(makeOp(nss)); - } - MultiApplier::Operations operationsToApply; - operationsToApply.push_back(createOp); - std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); - - std::size_t numFailedGroupedInserts = 0; - MultiApplier::Operations operationsApplied; - auto syncApply = [&numFailedGroupedInserts, - &operationsApplied](OperationContext*, const BSONObj& op, bool) -> Status { - // Reject grouped insert operations. - if (op["o"].type() == BSONType::Array) { - numFailedGroupedInserts++; - return {ErrorCodes::OperationFailed, "grouped inserts not supported"}; - } - operationsApplied.push_back(OplogEntry(op)); - return Status::OK(); - }; - - ASSERT_OK(multiSyncApply_noAbort(_txn.get(), operationsToApply, syncApply)); - - // On failing to apply the grouped insert operation, multiSyncApply should apply the operations - // as given in "operationsToApply": - // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} - ASSERT_EQUALS(limit + 2, operationsApplied.size()); - ASSERT_EQUALS(createOp, operationsApplied[0]); - - for (std::size_t i = 0; i < limit + 1; ++i) { - const auto& insertOp = insertOps[i]; - ASSERT_EQUALS(insertOp, operationsApplied[i + 1]); - } - - // Ensure that multiSyncApply does not attempt to group remaining operations in first failed - // grouped insert operation. - ASSERT_EQUALS(1U, numFailedGroupedInserts); -} - TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyingOperations) { SyncTailWithOperationContextChecker syncTail; NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); @@ -855,6 +625,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyRetriesFailedUpdateIfDocumentIsAvailab TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughSyncApplyErrorAfterFailingToRetryBadOp) { SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1)); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + auto updatedDocument = BSON("_id" << 0 << "x" << 2); OplogEntry op(BSON("op" << "x" << "ns" @@ -865,6 +636,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughSyncApplyErrorAfterFailin TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughShouldSyncTailRetryError) { SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + auto updatedDocument = BSON("_id" << 0 << "x" << 2); auto op = makeUpdateDocumentOplogEntry( {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); ASSERT_THROWS_CODE( |