summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2019-05-15 16:54:35 -0400
committerRandolph Tan <randolph@10gen.com>2019-05-17 16:49:22 -0400
commit32ade4aa6b6a73c3620486117388908ad9ad438c (patch)
tree757054f3a8c8bb898aeef89a9af8feabe7c9d215 /src
parent57c127b9040a1203a86214cf0bf896aa069afbbe (diff)
downloadmongo-32ade4aa6b6a73c3620486117388908ad9ad438c.tar.gz
SERVER-41074 Make decision to send pre/post image oplog the same as the decision made for the originating oplog write.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp24
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp25
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h10
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp327
4 files changed, 339 insertions, 47 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 9e6e44388fd..bcb5cd266e6 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -87,20 +87,6 @@ BSONObj createRequestWithSessionId(StringData commandName,
return builder.obj();
}
-bool shouldApplyOplogToSession(const repl::OplogEntry& oplog,
- const ChunkRange& range,
- const ShardKeyPattern& keyPattern) {
- // Skip appending CRUD operations that don't pertain to the ChunkRange being migrated.
- if (oplog.isCrudOpType()) {
- auto shardKey = keyPattern.extractShardKeyFromDoc(oplog.getObjectContainingDocumentKey());
- if (!range.containsKey(shardKey)) {
- return false;
- }
- }
-
- return true;
-}
-
} // namespace
/**
@@ -187,8 +173,11 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
- _sessionCatalogSource =
- stdx::make_unique<SessionCatalogMigrationSource>(opCtx, _args.getNss());
+ _sessionCatalogSource = stdx::make_unique<SessionCatalogMigrationSource>(
+ opCtx,
+ _args.getNss(),
+ ChunkRange(_args.getMinKey(), _args.getMaxKey()),
+ _shardKeyPattern.getKeyPattern());
// Prime up the session migration source if there are oplog entries to migrate.
_sessionCatalogSource->fetchNextOplog(opCtx);
@@ -859,8 +848,7 @@ boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigra
while (_sessionCatalogSource->hasMoreOplog()) {
auto result = _sessionCatalogSource->getLastFetchedOplog();
- if (!result.oplog ||
- !shouldApplyOplogToSession(result.oplog.get(), range, _shardKeyPattern)) {
+ if (!result.oplog) {
_sessionCatalogSource->fetchNextOplog(opCtx);
continue;
}
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 8882a4d09e7..cba5f1ec8a9 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -128,8 +128,13 @@ repl::OplogEntry makeSentinelOplogEntry(const LogicalSessionId& lsid,
} // namespace
SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* opCtx,
- NamespaceString ns)
- : _ns(std::move(ns)), _rollbackIdAtInit(repl::ReplicationProcess::get(opCtx)->getRollbackID()) {
+ NamespaceString ns,
+ ChunkRange chunk,
+ KeyPattern shardKey)
+ : _ns(std::move(ns)),
+ _rollbackIdAtInit(repl::ReplicationProcess::get(opCtx)->getRollbackID()),
+ _chunkRange(std::move(chunk)),
+ _keyPattern(shardKey) {
// Exclude entries for transaction.
Query query;
// Sort is not needed for correctness. This is just for making it easier to write deterministic
@@ -248,18 +253,27 @@ std::shared_ptr<Notification<bool>> SessionCatalogMigrationSource::getNotificati
}
bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) {
- if (_currentOplogIterator) {
+ while (_currentOplogIterator) {
if (auto nextOplog = _currentOplogIterator->getNext(opCtx)) {
auto nextStmtId = nextOplog->getStatementId();
- // Note: This is an optimization based on the assumption that it is not possible to be
- // touching different namespaces in the same transaction.
+ // Skip the rest of the chain for this session since the ns is unrelated with the
+ // current one being migrated. It is ok to not check the rest of the chain because
+ // retryable writes doesn't allow touching different namespaces.
if (!nextStmtId || (nextStmtId && *nextStmtId != kIncompleteHistoryStmtId &&
nextOplog->getNss() != _ns)) {
_currentOplogIterator.reset();
return false;
}
+ if (nextOplog->isCrudOpType()) {
+ auto shardKey =
+ _keyPattern.extractShardKeyFromDoc(nextOplog->getObjectContainingDocumentKey());
+ if (!_chunkRange.containsKey(shardKey)) {
+ continue;
+ }
+ }
+
auto doc = fetchPrePostImageOplog(opCtx, *nextOplog);
if (doc) {
_lastFetchedOplogBuffer.push_back(*nextOplog);
@@ -267,6 +281,7 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
} else {
_lastFetchedOplog = *nextOplog;
}
+
return true;
} else {
_currentOplogIterator.reset();
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 89abe7d83cd..06093d4c8e8 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -37,6 +37,8 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/notification.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -87,7 +89,10 @@ public:
bool shouldWaitForMajority = false;
};
- SessionCatalogMigrationSource(OperationContext* opCtx, NamespaceString ns);
+ SessionCatalogMigrationSource(OperationContext* opCtx,
+ NamespaceString ns,
+ ChunkRange chunk,
+ KeyPattern shardKey);
/**
* Returns true if there are more oplog entries to fetch at this moment. Note that new writes
@@ -221,6 +226,9 @@ private:
// followed by step-up situations can be discovered.
const int _rollbackIdAtInit;
+ const ChunkRange _chunkRange;
+ const ShardKeyPattern _keyPattern;
+
// Protects _sessionCatalogCursor, _sessionOplogIterators, _currentOplogIterator,
// _lastFetchedOplogBuffer, _lastFetchedOplog
stdx::mutex _sessionCloneMutex;
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index c4fabc0b443..16492fbb17c 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -46,6 +46,8 @@ namespace {
using executor::RemoteCommandRequest;
const NamespaceString kNs("a.b");
+const KeyPattern kShardKey(BSON("x" << 1));
+const ChunkRange kChunkRange(BSON("x" << 0), BSON("x" << 100));
class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {};
@@ -107,7 +109,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
}
TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) {
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
}
@@ -126,7 +128,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
auto entry2 =
makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime
repl::OpTypeEnum::kInsert, // op type
- BSON("y" << 50), // o
+ BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
1, // statement id
@@ -142,7 +144,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -179,7 +181,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
auto entry1b =
makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime
repl::OpTypeEnum::kInsert, // op type
- BSON("y" << 50), // o
+ BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
1, // statement id
@@ -207,7 +209,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
auto entry2b =
makeOplogEntry(repl::OpTime(Timestamp(789, 13), 2), // optime
repl::OpTypeEnum::kDelete, // op type
- BSON("y" << 50), // o
+ BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
4, // statement id
@@ -227,7 +229,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
insertOplogEntry(entry1b);
insertOplogEntry(entry2b);
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog,
@@ -284,7 +286,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
auto entry2 = makeOplogEntry(
repl::OpTime(Timestamp(52, 346), 2), // optime
repl::OpTypeEnum::kDelete, // op type
- BSON("y" << 50), // o
+ BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
1, // statement id
@@ -305,8 +307,8 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
auto entry4 =
makeOplogEntry(repl::OpTime(Timestamp(73, 6), 2), // optime
repl::OpTypeEnum::kUpdate, // op type
- BSON("x" << 19), // o
- BSON("$inc" << BSON("x" << 1)), // o2
+ BSON("$inc" << BSON("x" << 1)), // o
+ BSON("x" << 19), // o2
Date_t::now(), // wall clock time
3, // statement id
entry2.getOpTime(), // optime of previous write within same transaction
@@ -323,7 +325,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto expectedSequece = {entry3, entry4, entry1, entry2};
@@ -385,7 +387,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
sessionRecord2.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -433,14 +435,14 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
auto entry3 = makeOplogEntry(
repl::OpTime(Timestamp(55, 12), 2), // optime
repl::OpTypeEnum::kInsert, // op type
- BSON("z" << 40), // o
+ BSON("x" << 40), // o
boost::none, // o2
Date_t::now(), // wall clock time
2, // statement id
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry3);
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(
@@ -477,7 +479,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
}
TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(
@@ -488,7 +490,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
}
TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) {
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -544,7 +546,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
auto entry = makeOplogEntry(
repl::OpTime(Timestamp(55, 12), 2), // optime
repl::OpTypeEnum::kInsert, // op type
- BSON("z" << 40), // o
+ BSON("x" << 40), // o
boost::none, // o2
Date_t::now(), // wall clock time
2, // statement id
@@ -587,7 +589,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -645,7 +647,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -674,7 +676,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -700,7 +702,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -725,7 +727,7 @@ TEST_F(SessionCatalogMigrationSourceTest, InProgressTransactionEntriesShouldBeIg
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -742,7 +744,7 @@ TEST_F(SessionCatalogMigrationSourceTest, AbortedTransactionEntriesShouldBeIgnor
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -784,7 +786,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
// Insert the 'insert' oplog entry into the oplog.
insertOplogEntry(insertOplog);
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
// Function to verify the oplog entry corresponding to the retryable write.
auto checkRetryableWriteEntry = [&] {
@@ -823,5 +825,284 @@ TEST_F(SessionCatalogMigrationSourceTest,
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
}
+
+TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyDeleteNotTouchingChunkIsIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << -50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kDelete, // op type
+ BSON("x" << -50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ entry1.getOpTime()); // pre-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdatePrePostNotTouchingChunkIsIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << -5), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kUpdate, // op type
+ BSON("$set" << BSON("y" << 1)), // o
+ BSON("x" << -5), // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ entry1.getOpTime()); // pre-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest,
+ UpdatePreImageTouchingPostNotTouchingChunkShouldNotBeIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << -50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kUpdate, // op type
+ BSON("$set" << BSON("x" << -50)), // o
+ BSON("x" << 10), // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ entry1.getOpTime()); // post-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+
+ auto expectedSequece = {entry1, entry2};
+
+ for (auto oplog : expectedSequece) {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON());
+ migrationSource.fetchNextOplog(opCtx());
+ }
+
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+}
+
+TEST_F(SessionCatalogMigrationSourceTest,
+ UpdatePreImageNotTouchingPostTouchingChunkShouldBeIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << 50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kUpdate, // op type
+ BSON("$set" << BSON("x" << 50)), // o
+ BSON("x" << -10), // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ entry1.getOpTime()); // post-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdateNotTouchingChunkShouldBeIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << -10 << "y" << 50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kUpdate, // op type
+ BSON("$set" << BSON("y" << 50)), // o
+ BSON("x" << -10), // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ entry1.getOpTime()); // post-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWriteNotInChunk) {
+ auto sessionId1 = makeLogicalSessionIdForTest();
+ auto sessionId2 = makeLogicalSessionIdForTest();
+
+ auto cmpResult = sessionId1.toBSON().woCompare(sessionId2.toBSON());
+ auto lowerSessionId = (cmpResult < 0) ? sessionId1 : sessionId2;
+ auto higherSessionId = (cmpResult < 0) ? sessionId2 : sessionId1;
+
+ auto entry1a = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kInsert, // op type
+ BSON("x" << 30), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+
+ auto entry1b =
+ makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime
+ repl::OpTypeEnum::kInsert, // op type
+ BSON("x" << -50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ entry1a.getOpTime()); // optime of previous write within same transaction
+
+ SessionTxnRecord sessionRecord1;
+ sessionRecord1.setSessionId(higherSessionId);
+ sessionRecord1.setTxnNum(1);
+ sessionRecord1.setLastWriteOpTime(entry1b.getOpTime());
+ sessionRecord1.setLastWriteDate(*entry1b.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ sessionRecord1.toBSON());
+
+ auto entry2a = makeOplogEntry(
+ repl::OpTime(Timestamp(43, 12), 2), // optime
+ repl::OpTypeEnum::kDelete, // op type
+ BSON("x" << 30), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 3, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+
+ auto entry2b =
+ makeOplogEntry(repl::OpTime(Timestamp(789, 13), 2), // optime
+ repl::OpTypeEnum::kDelete, // op type
+ BSON("x" << 50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 4, // statement id
+ entry2a.getOpTime()); // optime of previous write within same transaction
+
+ SessionTxnRecord sessionRecord2;
+ sessionRecord2.setSessionId(lowerSessionId);
+ sessionRecord2.setTxnNum(1);
+ sessionRecord2.setLastWriteOpTime(entry2b.getOpTime());
+ sessionRecord2.setLastWriteDate(*entry2b.getWallClockTime());
+
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ sessionRecord2.toBSON());
+
+ insertOplogEntry(entry2a);
+ insertOplogEntry(entry1a);
+ insertOplogEntry(entry1b);
+ insertOplogEntry(entry2b);
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+
+ auto expectedSequece = {entry1a, entry2b, entry2a};
+
+ for (auto oplog : expectedSequece) {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON());
+ migrationSource.fetchNextOplog(opCtx());
+ }
+
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+}
+
} // namespace
} // namespace mongo