diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail_test.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 405 |
1 files changed, 400 insertions, 5 deletions
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index f080f656c4d..5cd3d140d34 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -49,6 +49,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/bgsync.h" @@ -65,6 +66,8 @@ #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/stats/counters.h" +#include "mongo/db/transaction_participant_gen.h" #include "mongo/stdx/mutex.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" @@ -470,6 +473,389 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } +class MultiOplogEntrySyncTailTest : public SyncTailTest { + void setUp() override { + SyncTailTest::setUp(); + gUseMultipleOplogEntryFormatForTransactions = true; + } + void tearDown() override { + gUseMultipleOplogEntryFormatForTransactions = false; + SyncTailTest::tearDown(); + } +}; + +TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { + NamespaceString nss1("test.pendingtxn1"); + NamespaceString nss2("test.pendingtxn2"); + + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(1); + + auto insertOp1 = + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 1), 1LL}, + nss1, + uuid1, + BSON("_id" << 1), + lsid, + txnNum, + StmtId(0), + OpTime()); + insertOp1 = uassertStatusOK( + OplogEntry::parse(insertOp1.toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto insertOp2 = + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 2), 1LL}, + nss2, + uuid2, + BSON("_id" << 2), + lsid, + txnNum, + StmtId(1), + insertOp1.getOpTime()); + insertOp2 = uassertStatusOK( + OplogEntry::parse(insertOp2.toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum, + StmtId(2), + insertOp2.getOpTime()); + // This re-parse puts the commit op into a normalized form for comparison. + commitOp = uassertStatusOK(OplogEntry::parse(commitOp.toBSON())); + + // Use separate vectors for each namespace as the opObserver may be called from multiple + // threads at once. + std::vector<BSONObj> insertedOplogEntries, insertedDocs1, insertedDocs2; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + if (nss.isOplog()) + insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end()); + else if (nss == nss1) { + insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end()); + } else if (nss == nss2) { + insertedDocs2.insert(insertedDocs2.end(), docs.begin(), docs.end()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) { + // Not testing session updates for now. + } else + FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); + }; + + auto writerPool = OplogApplier::makeWriterPool(); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); + + // Apply a batch with only the first operation. This should result in the first oplog entry + // being put in the oplog, but with no effect because the operation is part of a pending + // transaction. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp1})); + ASSERT_EQ(1U, insertedOplogEntries.size()); + ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp1.toBSON()); + ASSERT_TRUE(insertedDocs1.empty()); + ASSERT_TRUE(insertedDocs2.empty()); + + // Apply a batch with only the second operation. This should result in the second oplog entry + // being put in the oplog, but with no effect because the operation is part of a pending + // transaction. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp2})); + ASSERT_EQ(2U, insertedOplogEntries.size()); + ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp2.toBSON()); + ASSERT_TRUE(insertedDocs1.empty()); + ASSERT_TRUE(insertedDocs2.empty()); + + // Apply a batch with only the commit. This should result in the commit being put in the + // oplog, and the two previous entries being applied. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {commitOp})); + ASSERT_EQ(3U, insertedOplogEntries.size()); + ASSERT_EQ(1U, insertedDocs1.size()); + ASSERT_EQ(1U, insertedDocs2.size()); + ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), commitOp.toBSON()); +} + +TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) { + NamespaceString nss1("test.pendingtxn1"); + NamespaceString nss2("test.pendingtxn2"); + + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(1); + + auto insertOp1 = + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 1), 1LL}, + nss1, + uuid1, + BSON("_id" << 1), + lsid, + txnNum, + StmtId(0), + OpTime()); + insertOp1 = uassertStatusOK( + OplogEntry::parse(insertOp1.toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto insertOp2 = + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 2), 1LL}, + nss2, + uuid2, + BSON("_id" << 2), + lsid, + txnNum, + StmtId(1), + insertOp1.getOpTime()); + insertOp2 = uassertStatusOK( + OplogEntry::parse(insertOp2.toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum, + StmtId(2), + insertOp2.getOpTime()); + // This re-parse puts the commit op into a normalized form. + commitOp = uassertStatusOK(OplogEntry::parse(commitOp.toBSON())); + + // Use separate vectors for each namespace as the opObserver may be called from multiple + // threads at once. + std::vector<BSONObj> insertedOplogEntries, insertedDocs1, insertedDocs2; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + if (nss.isOplog()) + insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end()); + else if (nss == nss1) { + insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end()); + } else if (nss == nss2) { + insertedDocs2.insert(insertedDocs2.end(), docs.begin(), docs.end()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) { + // Not testing session updates for now. + } else + FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); + }; + + auto writerPool = OplogApplier::makeWriterPool(); + // Skipping writes to oplog proves we're testing the code path which does not rely on reading + // the oplog. + OplogApplier::Options applierOpts; + applierOpts.skipWritesToOplog = true; + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + applierOpts); + + // Apply both inserts and the commit in a single batch. We expect no oplog entries to + // be inserted (because we've set skipWritesToOplog), and both entries to be committed. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp1, insertOp2, commitOp})); + ASSERT_EQ(0U, insertedOplogEntries.size()); + ASSERT_EQ(1U, insertedDocs1.size()); + ASSERT_EQ(1U, insertedDocs2.size()); +} + +TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { + // Tests an unprepared transaction with ops both in the batch with the commit and prior + // batches. + NamespaceString nss1("test.pendingtxn1"); + NamespaceString nss2("test.pendingtxn2"); + + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(1); + + // Populate transaction with 4 linked inserts, one in nss2 and the others in nss1. + std::vector<OplogEntry> insertOps; + for (int i = 0; i < 4; i++) { + insertOps.push_back(makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), i + 1), 1LL}, + i == 1 ? nss2 : nss1, + i == 1 ? uuid2 : uuid1, + BSON("_id" << i), + lsid, + txnNum, + StmtId(i), + i == 0 ? OpTime() : insertOps.back().getOpTime())); + insertOps.back() = uassertStatusOK(OplogEntry::parse( + insertOps.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + } + auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 5), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum, + StmtId(4), + insertOps.back().getOpTime()); + // This re-parse puts the commit op into a normalized form. + commitOp = uassertStatusOK(OplogEntry::parse(commitOp.toBSON())); + + // Use separate vectors for each namespace as the opObserver may be called from multiple + // threads at once. + std::vector<BSONObj> insertedOplogEntries, insertedDocs1, insertedDocs2; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + if (nss.isOplog()) + insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end()); + else if (nss == nss1) { + insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end()); + } else if (nss == nss2) { + insertedDocs2.insert(insertedDocs2.end(), docs.begin(), docs.end()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) { + // Not testing session updates for now. + } else + FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); + }; + + auto writerPool = OplogApplier::makeWriterPool(); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); + + // Insert the first entry in its own batch. This should result in the oplog entry being written + // but the entry should not be applied as it is part of a pending transaction. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]})); + ASSERT_EQ(1U, insertedOplogEntries.size()); + ASSERT_EQ(0U, insertedDocs1.size()); + ASSERT_EQ(0U, insertedDocs2.size()); + + // Insert the rest of the entries, including the commit. These entries should be added to the + // oplog, and all the entries including the first should be applied. + ASSERT_OK( + syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp})); + ASSERT_EQ(5U, insertedOplogEntries.size()); + ASSERT_EQ(3U, insertedDocs1.size()); + ASSERT_EQ(1U, insertedDocs2.size()); + + // Check docs and ordering of docs in nss1. + // The insert into nss2 is unordered with respect to those. + ASSERT_BSONOBJ_EQ(insertOps[0].getObject(), insertedDocs1[0]); + ASSERT_BSONOBJ_EQ(insertOps[1].getObject(), insertedDocs2.front()); + ASSERT_BSONOBJ_EQ(insertOps[2].getObject(), insertedDocs1[1]); + ASSERT_BSONOBJ_EQ(insertOps[3].getObject(), insertedDocs1[2]); +} + +TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) { + // Tests that two transactions on the same session ID in the same batch both + // apply correctly. + NamespaceString nss1("test.pendingtxn1"); + + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); + + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum1(1); + TxnNumber txnNum2(2); + + std::vector<OplogEntry> insertOps1, insertOps2; + insertOps1.push_back( + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 1), 1LL}, + nss1, + uuid1, + BSON("_id" << 1), + lsid, + txnNum1, + StmtId(0), + OpTime())); + insertOps1.back() = uassertStatusOK(OplogEntry::parse( + insertOps1.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + insertOps1.push_back( + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 2), 1LL}, + nss1, + uuid1, + BSON("_id" << 2), + lsid, + txnNum1, + StmtId(1), + insertOps1.back().getOpTime())); + insertOps1.back() = uassertStatusOK(OplogEntry::parse( + insertOps1.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + insertOps2.push_back( + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 1), 1LL}, + nss1, + uuid1, + BSON("_id" << 3), + lsid, + txnNum2, + StmtId(0), + OpTime())); + insertOps2.back() = uassertStatusOK(OplogEntry::parse( + insertOps2.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + insertOps2.push_back( + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 2), 1LL}, + nss1, + uuid1, + BSON("_id" << 4), + lsid, + txnNum2, + StmtId(1), + insertOps2.back().getOpTime())); + insertOps2.back() = uassertStatusOK(OplogEntry::parse( + insertOps2.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto commitOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum1, + StmtId(2), + insertOps1.back().getOpTime()); + auto commitOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(2), 3), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum2, + StmtId(2), + insertOps2.back().getOpTime()); + // This re-parse puts the commit ops into a normalized form. + commitOp1 = uassertStatusOK(OplogEntry::parse(commitOp1.toBSON())); + commitOp2 = uassertStatusOK(OplogEntry::parse(commitOp2.toBSON())); + + // Use separate vectors for each namespace as the opObserver may be called from multiple + // threads at once. + std::vector<BSONObj> insertedOplogEntries, insertedDocs1; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + if (nss.isOplog()) + insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end()); + else if (nss == nss1) { + insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) { + // Not testing session updates for now. + } else + FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); + }; + + auto writerPool = OplogApplier::makeWriterPool(); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); + + // Note the insert counter so we can check it later. It is necessary to use opCounters as + // inserts are idempotent so we will not detect duplicate inserts just by checking inserts in + // the opObserver. + int insertsBefore = replOpCounters.getInsert()->load(); + // Insert all the oplog entries in one batch. All inserts should be executed, in order, exactly + // once. + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), + {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2})); + ASSERT_EQ(6U, insertedOplogEntries.size()); + ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore); + ASSERT_EQ(4U, insertedDocs1.size()); + + // Check docs and ordering of docs in nss1. + ASSERT_BSONOBJ_EQ(insertOps1[0].getObject(), insertedDocs1[0]); + ASSERT_BSONOBJ_EQ(insertOps1[1].getObject(), insertedDocs1[1]); + ASSERT_BSONOBJ_EQ(insertOps2[0].getObject(), insertedDocs1[2]); + ASSERT_BSONOBJ_EQ(insertOps2[1].getObject(), insertedDocs1[3]); +} + void testWorkerMultikeyPaths(OperationContext* opCtx, const OplogEntry& op, unsigned long numPaths) { @@ -1804,25 +2190,34 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) { ASSERT(client.runCommand(ns1.db().toString(), BSON("create" << ns1.coll()), result)); ASSERT(client.runCommand(ns2.db().toString(), BSON("create" << ns2.coll()), result)); ASSERT(client.runCommand(ns3.db().toString(), BSON("create" << ns3.coll()), result)); + auto uuid0 = [&] { + return AutoGetCollectionForRead(_opCtx.get(), ns0).getCollection()->uuid(); + }(); + auto uuid1 = [&] { + return AutoGetCollectionForRead(_opCtx.get(), ns1).getCollection()->uuid(); + }(); + auto uuid2 = [&] { + return AutoGetCollectionForRead(_opCtx.get(), ns2).getCollection()->uuid(); + }(); // Entries with a session id and a txnNumber update the transaction table. auto lsidSingle = makeLogicalSessionIdForTest(); auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 0), 1LL}, ns0, BSON("_id" << 0), lsidSingle, 5LL, 0); + {Timestamp(Seconds(1), 0), 1LL}, ns0, uuid0, BSON("_id" << 0), lsidSingle, 5LL, 0); // For entries with the same session, the entry with a larger txnNumber is saved. auto lsidDiffTxn = makeLogicalSessionIdForTest(); auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(2), 0), 1LL}, ns1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1); + {Timestamp(Seconds(2), 0), 1LL}, ns1, uuid1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1); auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(3), 0), 1LL}, ns1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1); + {Timestamp(Seconds(3), 0), 1LL}, ns1, uuid1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1); // For entries with the same session and txnNumber, the later optime is saved. auto lsidSameTxn = makeLogicalSessionIdForTest(); auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(6), 0), 1LL}, ns2, BSON("_id" << 0), lsidSameTxn, 30LL, 0); + {Timestamp(Seconds(6), 0), 1LL}, ns2, uuid2, BSON("_id" << 0), lsidSameTxn, 30LL, 0); auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(5), 0), 1LL}, ns2, BSON("_id" << 1), lsidSameTxn, 30LL, 1); + {Timestamp(Seconds(5), 0), 1LL}, ns2, uuid2, BSON("_id" << 1), lsidSameTxn, 30LL, 1); // Entries with a session id but no txnNumber do not lead to updates. auto lsidNoTxn = makeLogicalSessionIdForTest(); |