diff options
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination_test.cpp | 79 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 13 |
2 files changed, 71 insertions, 21 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 8cbaa4108e3..49da1334126 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -33,6 +33,7 @@ #include "mongo/client/connection_string.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/initialize_operation_session_info.h" #include "mongo/db/logical_session_cache_noop.h" @@ -1518,13 +1519,14 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory sessionInfo, // session info Date_t::now(), // wall clock time kIncompleteHistoryStmtId), // statement id - makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime - OpTypeEnum::kInsert, // op type - BSON("x" << 60), // o - boost::none, // o2 - sessionInfo, // session info - Date_t::now(), // wall clock time - 5)}; // statement id + // This will get ignored since previous entry will make the history 'incomplete'. + makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime + OpTypeEnum::kInsert, // op type + BSON("x" << 60), // o + boost::none, // o2 + sessionInfo, // session info + Date_t::now(), // wall clock time + 5)}; // statement id SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); sessionMigration.start(getServiceContext()); @@ -1545,19 +1547,12 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); - checkOplogWithNestedOplog(oplogEntries[2], historyIter.next(opCtx)); - - ASSERT_TRUE(historyIter.hasNext()); checkOplog(oplogEntries[1], historyIter.next(opCtx)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplogEntries[0], historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); - - checkStatementExecuted(opCtx, session.get(), 2, 23, oplogEntries[0]); - checkStatementExecuted(opCtx, session.get(), 2, 5, oplogEntries[2]); - ASSERT_THROWS(session->checkStatementExecuted(opCtx, 2, 38), AssertionException); } TEST_F(SessionCatalogMigrationDestinationTest, @@ -1645,5 +1640,61 @@ TEST_F(SessionCatalogMigrationDestinationTest, } } +TEST_F(SessionCatalogMigrationDestinationTest, MigratingKnownStmtWhileOplogTruncated) { + auto opCtx = operationContext(); + const StmtId kStmtId = 45; + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + sessionInfo.setTxnNumber(19); + + insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), kStmtId); + + auto getLastWriteOpTime = [&]() { + auto c1 = getServiceContext()->makeClient("c1"); + AlternativeClientRegion acr(c1); + auto innerOpCtx = cc().makeOperationContext(); + auto session = getSessionWithTxn(innerOpCtx.get(), *sessionInfo.getSessionId(), 19); + return session->getLastWriteOpTime(19); + }; + + auto lastOpTimeBeforeMigrate = getLastWriteOpTime(); + + { + AutoGetCollection oplogColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_X); + ASSERT_OK(oplogColl.getCollection()->truncate(opCtx)); // Empties the oplog collection. + } + + { + // Confirm that oplog is indeed empty. + DBDirectClient client(opCtx); + auto result = client.findOne(NamespaceString::kRsOplogNamespace.ns(), {}); + ASSERT_TRUE(result.isEmpty()); + } + + auto sameStmtOplog = makeOplogEntry(OpTime(Timestamp(100, 2), 1), // optime + OpTypeEnum::kInsert, // op type + BSON("_id" << 46), // o + boost::none, // o2 + sessionInfo, // session info + Date_t::now(), // wall clock time + kStmtId); // statement id + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + returnOplog({sameStmtOplog}); + + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + ASSERT_EQ(lastOpTimeBeforeMigrate, getLastWriteOpTime()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index a6ceba98b45..659ca1c3cc4 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -445,18 +445,17 @@ bool Session::onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumb return false; } } catch (const DBException& ex) { - // If the transaction chain was truncated on the recipient shard, then we are most likely - // copying from a session that hasn't been touched on the recipient shard for a very long - // time but could be recent on the donor. - // - // We continue copying regardless to get the entire transaction from the donor. - if (ex.code() != ErrorCodes::IncompleteTransactionHistory) { - throw; + // If the transaction chain is incomplete because oplog was truncated, just ignore the + // incoming oplog and don't attempt to 'patch up' the missing pieces. + if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { + return false; } if (stmtId == kIncompleteHistoryStmtId) { return false; } + + throw; } return true; |