diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail_test.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 238 |
1 files changed, 233 insertions, 5 deletions
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index ac3b6e1ddd2..81e4cc51ac1 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -111,6 +111,7 @@ void SyncTailTest::setUp() { Client::initThreadIfNotAlready(); _txn = cc().makeOperationContext(); + _txn->lockState()->setIsBatchWriter(false); _opsApplied = 0; _applyOp = [](OperationContext* txn, Database* db, @@ -191,7 +192,8 @@ OplogEntry makeCreateCollectionOplogEntry(OpTime opTime, bob.appendElements(opTime.toBSON()); bob.append("h", 1LL); bob.append("op", "c"); - bob.append("ns", nss.ns()); + bob.append("ns", nss.getCommandNS()); + bob.append("o", BSON("create" << nss.coll())); return OplogEntry(bob.obj()); } @@ -551,8 +553,8 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH operationsApplied.push_back(operationsForWriterThreadToApply); }; - auto op1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1); - auto op2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2); + auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1, BSON("x" << 1)); + auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2, BSON("x" << 2)); auto lastOpTime = unittest::assertGet(multiApply(_txn.get(), &writerPool, {op1, op2}, applyOperationFn)); @@ -580,6 +582,234 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH ASSERT_EQUALS(op2, operationsWritternToOplog[1]); } +TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + ASSERT_TRUE(_txn->writesAreReplicated()); + ASSERT_FALSE(documentValidationDisabled(_txn.get())); + ASSERT_FALSE(_txn->lockState()->isBatchWriter()); + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); + _txn.reset(); + multiSyncApply({op}, nullptr); + // Collection should be created after SyncTail::syncApply() processes operation. + _txn = cc().makeOperationContext(); + ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection()); +} + +TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + ASSERT_TRUE(_txn->writesAreReplicated()); + ASSERT_FALSE(documentValidationDisabled(_txn.get())); + ASSERT_FALSE(_txn->lockState()->isBatchWriter()); + auto syncApply = [](OperationContext* txn, const BSONObj&, bool convertUpdatesToUpserts) { + ASSERT_FALSE(txn->writesAreReplicated()); + ASSERT_TRUE(txn->lockState()->isBatchWriter()); + ASSERT_TRUE(documentValidationDisabled(txn)); + ASSERT_TRUE(convertUpdatesToUpserts); + return Status::OK(); + }; + auto op = makeUpdateDocumentOplogEntry( + {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); + ASSERT_OK(multiSyncApply_noAbort(_txn.get(), {op}, syncApply)); +} + +TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToApplyOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + OplogEntry op(BSON("op" + << "x" + << "ns" + << nss.ns())); + auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status { + return {ErrorCodes::OperationFailed, ""}; + }; + ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_txn.get(), {op}, syncApply)); +} + +TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + OplogEntry op(BSON("op" + << "x" + << "ns" + << nss.ns())); + auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status { + uasserted(ErrorCodes::OperationFailed, ""); + MONGO_UNREACHABLE; + }; + ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_txn.get(), {op}, syncApply)); +} + +TEST_F(SyncTailTest, MultiSyncApplySortsOperationsStablyByNamespaceBeforeApplying) { + int x = 0; + auto makeOp = [&x](const char* ns) -> OplogEntry { + return OplogEntry(BSON("op" + << "x" + << "ns" + << ns + << "x" + << x++)); + }; + auto op1 = makeOp("test.t1"); + auto op2 = makeOp("test.t1"); + auto op3 = makeOp("test.t2"); + auto op4 = makeOp("test.t3"); + MultiApplier::Operations operationsApplied; + auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { + operationsApplied.push_back(OplogEntry(op)); + return Status::OK(); + }; + ASSERT_OK(multiSyncApply_noAbort(_txn.get(), {op4, op1, op3, op2}, syncApply)); + ASSERT_EQUALS(4U, operationsApplied.size()); + ASSERT_EQUALS(op1, operationsApplied[0]); + ASSERT_EQUALS(op2, operationsApplied[1]); + ASSERT_EQUALS(op3, operationsApplied[2]); + ASSERT_EQUALS(op4, operationsApplied[3]); +} + +TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplying) { + int seconds = 0; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + NamespaceString nss1("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); + NamespaceString nss2("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_2"); + auto createOp1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss1); + auto createOp2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss2); + auto insertOp1a = makeOp(nss1); + auto insertOp1b = makeOp(nss1); + auto insertOp2a = makeOp(nss2); + auto insertOp2b = makeOp(nss2); + MultiApplier::Operations operationsApplied; + auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { + operationsApplied.push_back(OplogEntry(op)); + return Status::OK(); + }; + + ASSERT_OK(multiSyncApply_noAbort( + _txn.get(), + {createOp1, createOp2, insertOp1a, insertOp2a, insertOp1b, insertOp2b}, + syncApply)); + + ASSERT_EQUALS(4U, operationsApplied.size()); + ASSERT_EQUALS(createOp1, operationsApplied[0]); + ASSERT_EQUALS(createOp2, operationsApplied[1]); + + // Check grouped insert operations in namespace "nss1". + ASSERT_EQUALS(insertOp1a.getOpTime(), operationsApplied[2].getOpTime()); + ASSERT_EQUALS(insertOp1a.ns, operationsApplied[2].ns); + ASSERT_EQUALS(BSONType::Array, operationsApplied[2].o.type()); + auto group1 = operationsApplied[2].o.Array(); + ASSERT_EQUALS(2U, group1.size()); + ASSERT_EQUALS(insertOp1a.o.Obj(), group1[0].Obj()); + ASSERT_EQUALS(insertOp1b.o.Obj(), group1[1].Obj()); + + // Check grouped insert operations in namespace "nss2". + ASSERT_EQUALS(insertOp2a.getOpTime(), operationsApplied[3].getOpTime()); + ASSERT_EQUALS(insertOp2a.ns, operationsApplied[3].ns); + ASSERT_EQUALS(BSONType::Array, operationsApplied[3].o.type()); + auto group2 = operationsApplied[3].o.Array(); + ASSERT_EQUALS(2U, group2.size()); + ASSERT_EQUALS(insertOp2a.o.Obj(), group2[0].Obj()); + ASSERT_EQUALS(insertOp2b.o.Obj(), group2[1].Obj()); +} + +TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) { + int seconds = 0; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + // Generate operations to apply: + // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} + std::size_t limit = 64; + MultiApplier::Operations insertOps; + for (std::size_t i = 0; i < limit + 1; ++i) { + insertOps.push_back(makeOp(nss)); + } + MultiApplier::Operations operationsToApply; + operationsToApply.push_back(createOp); + std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); + MultiApplier::Operations operationsApplied; + auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { + operationsApplied.push_back(OplogEntry(op)); + return Status::OK(); + }; + + ASSERT_OK(multiSyncApply_noAbort(_txn.get(), operationsToApply, syncApply)); + + // multiSyncApply should combine operations as follows: + // {create}, {grouped_insert}, {insert_(limit+1)} + ASSERT_EQUALS(3U, operationsApplied.size()); + ASSERT_EQUALS(createOp, operationsApplied[0]); + + const auto& groupedInsertOp = operationsApplied[1]; + ASSERT_EQUALS(insertOps.front().getOpTime(), groupedInsertOp.getOpTime()); + ASSERT_EQUALS(insertOps.front().ns, groupedInsertOp.ns); + ASSERT_EQUALS(BSONType::Array, groupedInsertOp.o.type()); + auto groupedInsertDocuments = groupedInsertOp.o.Array(); + ASSERT_EQUALS(limit, groupedInsertDocuments.size()); + for (std::size_t i = 0; i < limit; ++i) { + const auto& insertOp = insertOps[i]; + ASSERT_EQUALS(insertOp.o.Obj(), groupedInsertDocuments[i].Obj()); + } + + // (limit + 1)-th insert operations should not be included in group of first (limit) inserts. + ASSERT_EQUALS(insertOps.back(), operationsApplied[2]); +} + +TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) { + int seconds = 0; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + // Generate operations to apply: + // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} + std::size_t limit = 64; + MultiApplier::Operations insertOps; + for (std::size_t i = 0; i < limit + 1; ++i) { + insertOps.push_back(makeOp(nss)); + } + MultiApplier::Operations operationsToApply; + operationsToApply.push_back(createOp); + std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); + + std::size_t numFailedGroupedInserts = 0; + MultiApplier::Operations operationsApplied; + auto syncApply = [&numFailedGroupedInserts, + &operationsApplied](OperationContext*, const BSONObj& op, bool) -> Status { + // Reject grouped insert operations. + if (op["o"].type() == BSONType::Array) { + numFailedGroupedInserts++; + return {ErrorCodes::OperationFailed, "grouped inserts not supported"}; + } + operationsApplied.push_back(OplogEntry(op)); + return Status::OK(); + }; + + ASSERT_OK(multiSyncApply_noAbort(_txn.get(), operationsToApply, syncApply)); + + // On failing to apply the grouped insert operation, multiSyncApply should apply the operations + // as given in "operationsToApply": + // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} + ASSERT_EQUALS(limit + 2, operationsApplied.size()); + ASSERT_EQUALS(createOp, operationsApplied[0]); + + for (std::size_t i = 0; i < limit + 1; ++i) { + const auto& insertOp = insertOps[i]; + ASSERT_EQUALS(insertOp, operationsApplied[i + 1]); + } + + // Ensure that multiSyncApply does not attempt to group remaining operations in first failed + // grouped insert operation. + ASSERT_EQUALS(1U, numFailedGroupedInserts); +} + TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyingOperations) { SyncTailWithOperationContextChecker syncTail; NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); @@ -625,7 +855,6 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyRetriesFailedUpdateIfDocumentIsAvailab TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughSyncApplyErrorAfterFailingToRetryBadOp) { SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1)); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - auto updatedDocument = BSON("_id" << 0 << "x" << 2); OplogEntry op(BSON("op" << "x" << "ns" @@ -636,7 +865,6 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughSyncApplyErrorAfterFailin TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughShouldSyncTailRetryError) { SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - auto updatedDocument = BSON("_id" << 0 << "x" << 2); auto op = makeUpdateDocumentOplogEntry( {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); ASSERT_THROWS_CODE( |