From 32ade4aa6b6a73c3620486117388908ad9ad438c Mon Sep 17 00:00:00 2001 From: Randolph Tan Date: Wed, 15 May 2019 16:54:35 -0400 Subject: SERVER-41074 Make decision to send pre/post image oplog the same as the decision made for the originating oplog write. --- .../db/s/migration_chunk_cloner_source_legacy.cpp | 24 +- .../db/s/session_catalog_migration_source.cpp | 25 +- src/mongo/db/s/session_catalog_migration_source.h | 10 +- .../db/s/session_catalog_migration_source_test.cpp | 327 +++++++++++++++++++-- 4 files changed, 339 insertions(+), 47 deletions(-) (limited to 'src') diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 9e6e44388fd..bcb5cd266e6 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -87,20 +87,6 @@ BSONObj createRequestWithSessionId(StringData commandName, return builder.obj(); } -bool shouldApplyOplogToSession(const repl::OplogEntry& oplog, - const ChunkRange& range, - const ShardKeyPattern& keyPattern) { - // Skip appending CRUD operations that don't pertain to the ChunkRange being migrated. - if (oplog.isCrudOpType()) { - auto shardKey = keyPattern.extractShardKeyFromDoc(oplog.getObjectContainingDocumentKey()); - if (!range.containsKey(shardKey)) { - return false; - } - } - - return true; -} - } // namespace /** @@ -187,8 +173,11 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { auto const replCoord = repl::ReplicationCoordinator::get(opCtx); if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { - _sessionCatalogSource = - stdx::make_unique(opCtx, _args.getNss()); + _sessionCatalogSource = stdx::make_unique( + opCtx, + _args.getNss(), + ChunkRange(_args.getMinKey(), _args.getMaxKey()), + _shardKeyPattern.getKeyPattern()); // Prime up the session migration source if there are oplog entries to migrate. _sessionCatalogSource->fetchNextOplog(opCtx); @@ -859,8 +848,7 @@ boost::optional MigrationChunkClonerSourceLegacy::nextSessionMigra while (_sessionCatalogSource->hasMoreOplog()) { auto result = _sessionCatalogSource->getLastFetchedOplog(); - if (!result.oplog || - !shouldApplyOplogToSession(result.oplog.get(), range, _shardKeyPattern)) { + if (!result.oplog) { _sessionCatalogSource->fetchNextOplog(opCtx); continue; } diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 8882a4d09e7..cba5f1ec8a9 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -128,8 +128,13 @@ repl::OplogEntry makeSentinelOplogEntry(const LogicalSessionId& lsid, } // namespace SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* opCtx, - NamespaceString ns) - : _ns(std::move(ns)), _rollbackIdAtInit(repl::ReplicationProcess::get(opCtx)->getRollbackID()) { + NamespaceString ns, + ChunkRange chunk, + KeyPattern shardKey) + : _ns(std::move(ns)), + _rollbackIdAtInit(repl::ReplicationProcess::get(opCtx)->getRollbackID()), + _chunkRange(std::move(chunk)), + _keyPattern(shardKey) { // Exclude entries for transaction. Query query; // Sort is not needed for correctness. This is just for making it easier to write deterministic @@ -248,18 +253,27 @@ std::shared_ptr> SessionCatalogMigrationSource::getNotificati } bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) { - if (_currentOplogIterator) { + while (_currentOplogIterator) { if (auto nextOplog = _currentOplogIterator->getNext(opCtx)) { auto nextStmtId = nextOplog->getStatementId(); - // Note: This is an optimization based on the assumption that it is not possible to be - // touching different namespaces in the same transaction. + // Skip the rest of the chain for this session since the ns is unrelated with the + // current one being migrated. It is ok to not check the rest of the chain because + // retryable writes doesn't allow touching different namespaces. if (!nextStmtId || (nextStmtId && *nextStmtId != kIncompleteHistoryStmtId && nextOplog->getNss() != _ns)) { _currentOplogIterator.reset(); return false; } + if (nextOplog->isCrudOpType()) { + auto shardKey = + _keyPattern.extractShardKeyFromDoc(nextOplog->getObjectContainingDocumentKey()); + if (!_chunkRange.containsKey(shardKey)) { + continue; + } + } + auto doc = fetchPrePostImageOplog(opCtx, *nextOplog); if (doc) { _lastFetchedOplogBuffer.push_back(*nextOplog); @@ -267,6 +281,7 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte } else { _lastFetchedOplog = *nextOplog; } + return true; } else { _currentOplogIterator.reset(); diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 89abe7d83cd..06093d4c8e8 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -37,6 +37,8 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_history_iterator.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/with_lock.h" @@ -87,7 +89,10 @@ public: bool shouldWaitForMajority = false; }; - SessionCatalogMigrationSource(OperationContext* opCtx, NamespaceString ns); + SessionCatalogMigrationSource(OperationContext* opCtx, + NamespaceString ns, + ChunkRange chunk, + KeyPattern shardKey); /** * Returns true if there are more oplog entries to fetch at this moment. Note that new writes @@ -221,6 +226,9 @@ private: // followed by step-up situations can be discovered. const int _rollbackIdAtInit; + const ChunkRange _chunkRange; + const ShardKeyPattern _keyPattern; + // Protects _sessionCatalogCursor, _sessionOplogIterators, _currentOplogIterator, // _lastFetchedOplogBuffer, _lastFetchedOplog stdx::mutex _sessionCloneMutex; diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index c4fabc0b443..16492fbb17c 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -46,6 +46,8 @@ namespace { using executor::RemoteCommandRequest; const NamespaceString kNs("a.b"); +const KeyPattern kShardKey(BSON("x" << 1)); +const ChunkRange kChunkRange(BSON("x" << 0), BSON("x" << 100)); class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {}; @@ -107,7 +109,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, } TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) { - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); } @@ -126,7 +128,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { auto entry2 = makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime repl::OpTypeEnum::kInsert, // op type - BSON("y" << 50), // o + BSON("x" << 50), // o boost::none, // o2 Date_t::now(), // wall clock time 1, // statement id @@ -142,7 +144,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -179,7 +181,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { auto entry1b = makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime repl::OpTypeEnum::kInsert, // op type - BSON("y" << 50), // o + BSON("x" << 50), // o boost::none, // o2 Date_t::now(), // wall clock time 1, // statement id @@ -207,7 +209,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { auto entry2b = makeOplogEntry(repl::OpTime(Timestamp(789, 13), 2), // optime repl::OpTypeEnum::kDelete, // op type - BSON("y" << 50), // o + BSON("x" << 50), // o boost::none, // o2 Date_t::now(), // wall clock time 4, // statement id @@ -227,7 +229,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { insertOplogEntry(entry1b); insertOplogEntry(entry2b); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog, @@ -284,7 +286,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd auto entry2 = makeOplogEntry( repl::OpTime(Timestamp(52, 346), 2), // optime repl::OpTypeEnum::kDelete, // op type - BSON("y" << 50), // o + BSON("x" << 50), // o boost::none, // o2 Date_t::now(), // wall clock time 1, // statement id @@ -305,8 +307,8 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd auto entry4 = makeOplogEntry(repl::OpTime(Timestamp(73, 6), 2), // optime repl::OpTypeEnum::kUpdate, // op type - BSON("x" << 19), // o - BSON("$inc" << BSON("x" << 1)), // o2 + BSON("$inc" << BSON("x" << 1)), // o + BSON("x" << 19), // o2 Date_t::now(), // wall clock time 3, // statement id entry2.getOpTime(), // optime of previous write within same transaction @@ -323,7 +325,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto expectedSequece = {entry3, entry4, entry1, entry2}; @@ -385,7 +387,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord2.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -433,14 +435,14 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { auto entry3 = makeOplogEntry( repl::OpTime(Timestamp(55, 12), 2), // optime repl::OpTypeEnum::kInsert, // op type - BSON("z" << 40), // o + BSON("x" << 40), // o boost::none, // o2 Date_t::now(), // wall clock time 2, // statement id repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction insertOplogEntry(entry3); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime( @@ -477,7 +479,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { } TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime( @@ -488,7 +490,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { } TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) { - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); { @@ -544,7 +546,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer auto entry = makeOplogEntry( repl::OpTime(Timestamp(55, 12), 2), // optime repl::OpTypeEnum::kInsert, // op type - BSON("z" << 40), // o + BSON("x" << 40), // o boost::none, // o2 Date_t::now(), // wall clock time 2, // statement id @@ -587,7 +589,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -645,7 +647,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -674,7 +676,7 @@ TEST_F(SessionCatalogMigrationSourceTest, DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -700,7 +702,7 @@ TEST_F(SessionCatalogMigrationSourceTest, DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -725,7 +727,7 @@ TEST_F(SessionCatalogMigrationSourceTest, InProgressTransactionEntriesShouldBeIg DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -742,7 +744,7 @@ TEST_F(SessionCatalogMigrationSourceTest, AbortedTransactionEntriesShouldBeIgnor DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -784,7 +786,7 @@ TEST_F(SessionCatalogMigrationSourceTest, // Insert the 'insert' oplog entry into the oplog. insertOplogEntry(insertOplog); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); // Function to verify the oplog entry corresponding to the retryable write. auto checkRetryableWriteEntry = [&] { @@ -823,5 +825,284 @@ TEST_F(SessionCatalogMigrationSourceTest, ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); } + +TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyDeleteNotTouchingChunkIsIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << -50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kDelete, // op type + BSON("x" << -50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + entry1.getOpTime()); // pre-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdatePrePostNotTouchingChunkIsIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << -5), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kUpdate, // op type + BSON("$set" << BSON("y" << 1)), // o + BSON("x" << -5), // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + entry1.getOpTime()); // pre-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, + UpdatePreImageTouchingPostNotTouchingChunkShouldNotBeIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << -50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kUpdate, // op type + BSON("$set" << BSON("x" << -50)), // o + BSON("x" << 10), // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + boost::none, // pre-image optime + entry1.getOpTime()); // post-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + + auto expectedSequece = {entry1, entry2}; + + for (auto oplog : expectedSequece) { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON()); + migrationSource.fetchNextOplog(opCtx()); + } + + ASSERT_FALSE(migrationSource.hasMoreOplog()); +} + +TEST_F(SessionCatalogMigrationSourceTest, + UpdatePreImageNotTouchingPostTouchingChunkShouldBeIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << 50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kUpdate, // op type + BSON("$set" << BSON("x" << 50)), // o + BSON("x" << -10), // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + boost::none, // pre-image optime + entry1.getOpTime()); // post-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdateNotTouchingChunkShouldBeIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << -10 << "y" << 50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kUpdate, // op type + BSON("$set" << BSON("y" << 50)), // o + BSON("x" << -10), // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + boost::none, // pre-image optime + entry1.getOpTime()); // post-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWriteNotInChunk) { + auto sessionId1 = makeLogicalSessionIdForTest(); + auto sessionId2 = makeLogicalSessionIdForTest(); + + auto cmpResult = sessionId1.toBSON().woCompare(sessionId2.toBSON()); + auto lowerSessionId = (cmpResult < 0) ? sessionId1 : sessionId2; + auto higherSessionId = (cmpResult < 0) ? sessionId2 : sessionId1; + + auto entry1a = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kInsert, // op type + BSON("x" << 30), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + + auto entry1b = + makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime + repl::OpTypeEnum::kInsert, // op type + BSON("x" << -50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 1, // statement id + entry1a.getOpTime()); // optime of previous write within same transaction + + SessionTxnRecord sessionRecord1; + sessionRecord1.setSessionId(higherSessionId); + sessionRecord1.setTxnNum(1); + sessionRecord1.setLastWriteOpTime(entry1b.getOpTime()); + sessionRecord1.setLastWriteDate(*entry1b.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), + sessionRecord1.toBSON()); + + auto entry2a = makeOplogEntry( + repl::OpTime(Timestamp(43, 12), 2), // optime + repl::OpTypeEnum::kDelete, // op type + BSON("x" << 30), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 3, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + + auto entry2b = + makeOplogEntry(repl::OpTime(Timestamp(789, 13), 2), // optime + repl::OpTypeEnum::kDelete, // op type + BSON("x" << 50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 4, // statement id + entry2a.getOpTime()); // optime of previous write within same transaction + + SessionTxnRecord sessionRecord2; + sessionRecord2.setSessionId(lowerSessionId); + sessionRecord2.setTxnNum(1); + sessionRecord2.setLastWriteOpTime(entry2b.getOpTime()); + sessionRecord2.setLastWriteDate(*entry2b.getWallClockTime()); + + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), + sessionRecord2.toBSON()); + + insertOplogEntry(entry2a); + insertOplogEntry(entry1a); + insertOplogEntry(entry1b); + insertOplogEntry(entry2b); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + + auto expectedSequece = {entry1a, entry2b, entry2a}; + + for (auto oplog : expectedSequece) { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON()); + migrationSource.fetchNextOplog(opCtx()); + } + + ASSERT_FALSE(migrationSource.hasMoreOplog()); +} + } // namespace } // namespace mongo -- cgit v1.2.1