From bbe604a5b3f94f78a30a76941ea89c0ca5db14d2 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Sep 2020 18:30:11 +0000 Subject: SERVER-49899 Create config.transactions cloner for resharding (merges txns) --- src/mongo/db/s/SConscript | 29 +- src/mongo/db/s/resharding_txn_cloner.cpp | 171 +++++++++-- src/mongo/db/s/resharding_txn_cloner.h | 26 +- src/mongo/db/s/resharding_txn_cloner_test.cpp | 312 +++++++++++++++++++-- src/mongo/db/s/resharding_util.cpp | 18 -- src/mongo/db/s/resharding_util.h | 17 -- src/mongo/db/s/resharding_util_test.cpp | 22 +- .../db/s/session_catalog_migration_destination.cpp | 2 +- src/mongo/db/transaction_participant.cpp | 2 +- src/mongo/db/transaction_participant.h | 23 +- 10 files changed, 523 insertions(+), 99 deletions(-) diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index bcd10878025..ddbc7ab1f0b 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -126,6 +126,7 @@ env.Library( '$BUILD_DIR/mongo/db/session_catalog', '$BUILD_DIR/mongo/idl/server_parameter', '$BUILD_DIR/mongo/util/future_util', + 'resharding_txn_cloner', 'resharding_util', ], ) @@ -133,7 +134,6 @@ env.Library( env.Library( target='resharding_util', source=[ - 'resharding_txn_cloner.cpp', 'resharding_util.cpp', env.Idlc('resharding/coordinator_document.idl')[0], env.Idlc('resharding/donor_document.idl')[0], @@ -153,6 +153,30 @@ env.Library( '$BUILD_DIR/mongo/s/common_s', '$BUILD_DIR/mongo/s/grid', 'sharding_api_d', + + ], +) + +env.Library( + target='resharding_txn_cloner', + source=[ + 'resharding_txn_cloner.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', + '$BUILD_DIR/mongo/db/curop', + '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/db/pipeline/expression_context', + '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/db/repl/oplog', + '$BUILD_DIR/mongo/db/storage/write_unit_of_work', + '$BUILD_DIR/mongo/db/transaction', + '$BUILD_DIR/mongo/s/async_requests_sender', + '$BUILD_DIR/mongo/s/common_s', + '$BUILD_DIR/mongo/s/grid', + 'sharding_api_d', ], ) @@ -356,6 +380,7 @@ env.Library( '$BUILD_DIR/mongo/s/sharding_initialization', '$BUILD_DIR/mongo/s/sharding_router_api', 'balancer', + 'resharding_txn_cloner', 'resharding_util', 'sharding_runtime_d', ], @@ -466,6 +491,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', '$BUILD_DIR/mongo/s/sharding_router_test_fixture', + 'resharding_txn_cloner', 'resharding_util', 'shard_server_test_fixture', 'sharding_logging', @@ -559,6 +585,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/util/version_impl', 'balancer', 'config_server_test_fixture', + 'resharding_txn_cloner', 'resharding_util', ], ) diff --git a/src/mongo/db/s/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding_txn_cloner.cpp index f5e4721d3b4..c50e4806039 100644 --- a/src/mongo/db/s/resharding_txn_cloner.cpp +++ b/src/mongo/db/s/resharding_txn_cloner.cpp @@ -41,22 +41,58 @@ #include "mongo/client/fetcher.h" #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" -#include "mongo/db/query/query_request.h" -#include "mongo/db/s/resharding_util.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/logv2/redaction.h" #include "mongo/s/shard_id.h" namespace mongo { using namespace fmt::literals; + +std::unique_ptr createConfigTxnCloningPipelineForResharding( + const boost::intrusive_ptr& expCtx, + Timestamp fetchTimestamp, + boost::optional startAfter) { + invariant(!fetchTimestamp.isNull()); + + std::list> stages; + if (startAfter) { + stages.emplace_back(DocumentSourceMatch::create(BSON(SessionTxnRecord::kSessionIdFieldName + << BSON("$gt" << startAfter->toBSON()) + << SessionTxnRecord::kStateFieldName + << BSON("$exists" << false)), + expCtx)); + } else { + stages.emplace_back(DocumentSourceMatch::create( + BSON(SessionTxnRecord::kStateFieldName << BSON("$exists" << false)), expCtx)); + } + stages.emplace_back( + DocumentSourceSort::create(expCtx, BSON(SessionTxnRecord::kSessionIdFieldName << 1))); + stages.emplace_back(DocumentSourceMatch::create( + BSON((SessionTxnRecord::kLastWriteOpTimeFieldName + "." + repl::OpTime::kTimestampFieldName) + << BSON("$lt" << fetchTimestamp)), + expCtx)); + + return Pipeline::create(std::move(stages), expCtx); +} + + std::unique_ptr cloneConfigTxnsForResharding( OperationContext* opCtx, const ShardId& shardId, Timestamp fetchTimestamp, boost::optional startAfter, - std::function)> merge) { + std::function merge, + Status* status) { + boost::intrusive_ptr expCtx = make_intrusive( opCtx, nullptr, NamespaceString::kSessionTransactionsTableNamespace); auto pipeline = @@ -73,26 +109,35 @@ std::unique_ptr cloneConfigTxnsForResharding( auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); const auto targetHost = uassertStatusOK( shard->getTargeter()->findHost(opCtx, ReadPreferenceSetting{ReadPreference::Nearest})); - - auto fetcherCallback = [merge](const Fetcher::QueryResponseStatus& dataStatus, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - if (!dataStatus.isOK()) { - merge(dataStatus.getStatus()); - return; - } - - auto data = dataStatus.getValue(); - for (BSONObj doc : data.documents) { - merge(doc); - } - - if (!getMoreBob) { - return; - } - getMoreBob->append("getMore", data.cursorId); - getMoreBob->append("collection", data.nss.coll()); - }; + auto serviceContext = opCtx->getServiceContext(); + auto fetcherCallback = + [merge, status, serviceContext](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + if (!dataStatus.isOK()) { + *status = dataStatus.getStatus(); + return; + } + + ThreadClient threadClient(serviceContext); + auto uniqueOpCtx = cc().makeOperationContext(); + auto fetcherOpCtx = uniqueOpCtx.get(); + auto data = dataStatus.getValue(); + for (BSONObj doc : data.documents) { + try { + merge(fetcherOpCtx, doc); + } catch (const DBException& ex) { + *status = ex.toStatus(); + return; + } + } + + if (!getMoreBob) { + return; + } + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + }; auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); @@ -107,4 +152,84 @@ std::unique_ptr cloneConfigTxnsForResharding( return fetcher; } +void configTxnsMergerForResharding(OperationContext* opCtx, BSONObj donorBsonTransaction) { + SessionTxnRecord donorTransaction; + + donorTransaction = SessionTxnRecord::parse( + IDLParserErrorContext("resharding config transactions cloning"), donorBsonTransaction); + + opCtx->setLogicalSessionId(donorTransaction.getSessionId()); + + while (true) { + auto ocs = std::make_unique(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + // Which error code should this be? what message? + uassert(4989900, "Failed to get transaction Participant", txnParticipant); + try { + txnParticipant.beginOrContinue( + opCtx, donorTransaction.getTxnNum(), boost::none, boost::none); + } catch (const DBException& ex) { + if (ex.code() == ErrorCodes::TransactionTooOld) { + // donorTransaction.getTxnNum() < recipientTxnNumber + return; + } else if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { + // donorTransaction.getTxnNum() == recipientTxnNumber && + // !txnParticipant.transactionIsInRetryableWriteMode() + return; + } else if (ex.code() == ErrorCodes::PreparedTransactionInProgress) { + // txnParticipant.transactionIsPrepared() + ocs.reset(); + // TODO SERVER-51493 Change to not block here. + txnParticipant.onExitPrepare().wait(); + continue; + } + throw; + } + + repl::MutableOplogEntry oplogEntry; + oplogEntry.setObject(BSON("$sessionMigrateInfo" << 1)); + oplogEntry.setObject2(BSON("$incompleteOplogHistory" << 1)); + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setSessionId(donorTransaction.getSessionId()); + oplogEntry.setTxnNumber(donorTransaction.getTxnNum()); + oplogEntry.setStatementId(kIncompleteHistoryStmtId); + oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); + oplogEntry.setNss({}); + oplogEntry.setWallClockTime(Date_t::now()); + + writeConflictRetry( + opCtx, + "ReshardingTxnCloner", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + // Need to take global lock here so repl::logOp will not unlock it and + // trigger the invariant that disallows unlocking global lock while + // inside a WUOW. Take the transaction table db lock to ensure the same + // lock ordering with normal replicated updates to the table. + Lock::DBLock lk( + opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); + WriteUnitOfWork wunit(opCtx); + + repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry); + + uassert(4989901, + str::stream() << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() << ": " + << redact(oplogEntry.toBSON()), + !opTime.isNull()); + + SessionTxnRecord sessionTxnRecord(donorTransaction.getSessionId(), + donorTransaction.getTxnNum(), + opTime, + Date_t::now()); + txnParticipant.onRetryableWriteCloningCompleted( + opCtx, {kIncompleteHistoryStmtId}, sessionTxnRecord); + wunit.commit(); + }); + + return; + } +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding_txn_cloner.h b/src/mongo/db/s/resharding_txn_cloner.h index e7fe58360ef..497d9d8f810 100644 --- a/src/mongo/db/s/resharding_txn_cloner.h +++ b/src/mongo/db/s/resharding_txn_cloner.h @@ -37,6 +37,24 @@ namespace mongo { + +/** + * Create pipeline stages for iterating donor config.transactions. The pipeline has these stages: + * pipeline: [ + * {$match: {_id: {$gt: }, state: {$exists: false}}}, + * {$sort: {_id: 1}}, + * {$match: {"lastWriteOpTime.ts": {$lt: }}}, + * ], + * Note that the caller is responsible for making sure that the transactions ns is set in the + * expCtx. + * + * fetchTimestamp never isNull() + */ +std::unique_ptr createConfigTxnCloningPipelineForResharding( + const boost::intrusive_ptr& expCtx, + Timestamp fetchTimestamp, + boost::optional startAfter); + /** * Clone config.transactions from source and updates the config.transactions on itself. * The parameter merge is a function called on every transaction received and should be used @@ -49,6 +67,12 @@ std::unique_ptr cloneConfigTxnsForResharding( const ShardId& shardId, Timestamp fetchTimestamp, boost::optional startAfter, - std::function)> merge); + std::function merge, + Status* status); + +/** + * Callback function to be used to merge transactions cloned by cloneConfigTxnsForResharding + */ +void configTxnsMergerForResharding(OperationContext* opCtx, BSONObj donorBsonTransaction); } // namespace mongo diff --git a/src/mongo/db/s/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding_txn_cloner_test.cpp index c5993ee3582..9277931b8d2 100644 --- a/src/mongo/db/s/resharding_txn_cloner_test.cpp +++ b/src/mongo/db/s/resharding_txn_cloner_test.cpp @@ -34,11 +34,16 @@ #include #include "mongo/bson/bsonobj.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_cache_noop.h" #include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/resharding_txn_cloner.h" #include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/transaction_participant.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/unittest/unittest.h" @@ -57,6 +62,12 @@ class ReshardingTxnClonerTest : public ShardServerTestFixture { } WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + + // onStepUp() relies on the storage interface to create the config.transactions table. + repl::StorageInterface::set(getServiceContext(), + std::make_unique()); + MongoDSessionCatalog::onStepUp(operationContext()); + LogicalSessionCache::set(getServiceContext(), std::make_unique()); } void tearDown() { @@ -109,6 +120,87 @@ protected: ; } + void onCommandReturnTxns(std::vector firstBatch, std::vector secondBatch) { + CursorId cursorId(0); + if (secondBatch.size() > 0) { + cursorId = 123; + } + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse( + NamespaceString::kSessionTransactionsTableNamespace, cursorId, firstBatch) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + if (secondBatch.size() == 0) { + return; + } + + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace, + CursorId{0}, + secondBatch) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + } + + void seedTransactionOnRecipient(LogicalSessionId sessionId, + TxnNumber txnNum, + bool multiDocTxn) { + auto opCtx = operationContext(); + opCtx->setLogicalSessionId(sessionId); + opCtx->setTxnNumber(txnNum); + + if (multiDocTxn) { + opCtx->setInMultiDocumentTransaction(); + } + + MongoDOperationContextSession ocs(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + ASSERT(txnParticipant); + if (multiDocTxn) { + txnParticipant.beginOrContinue(opCtx, txnNum, false, true); + } else { + txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none); + } + } + + void checkTxnHasBeenUpdated(LogicalSessionId sessionId, TxnNumber txnNum) { + DBDirectClient client(operationContext()); + auto bsonOplog = + client.findOne(NamespaceString::kRsOplogNamespace.ns(), + BSON(repl::OplogEntryBase::kSessionIdFieldName << sessionId.toBSON())); + ASSERT(!bsonOplog.isEmpty()); + auto oplogEntry = repl::MutableOplogEntry::parse(bsonOplog).getValue(); + ASSERT_EQ(oplogEntry.getTxnNumber().get(), txnNum); + ASSERT_BSONOBJ_EQ(oplogEntry.getObject(), BSON("$sessionMigrateInfo" << 1)); + ASSERT_BSONOBJ_EQ(oplogEntry.getObject2().get(), BSON("$incompleteOplogHistory" << 1)); + ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kNoop); + + auto bsonTxn = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON())}); + ASSERT(!bsonTxn.isEmpty()); + auto txn = SessionTxnRecord::parse( + IDLParserErrorContext("resharding config transactions cloning test"), bsonTxn); + ASSERT_EQ(txn.getTxnNum(), txnNum); + ASSERT_EQ(txn.getLastWriteOpTime(), oplogEntry.getOpTime()); + } + void checkTxnHasNotBeenUpdated(LogicalSessionId sessionId, TxnNumber txnNum) { + auto opCtx = operationContext(); + opCtx->setLogicalSessionId(sessionId); + MongoDOperationContextSession ocs(opCtx); + auto txnParticipant = TransactionParticipant::get(opCtx); + + DBDirectClient client(operationContext()); + auto bsonOplog = + client.findOne(NamespaceString::kRsOplogNamespace.ns(), + BSON(repl::OplogEntryBase::kSessionIdFieldName << sessionId.toBSON())); + + ASSERT_BSONOBJ_EQ(bsonOplog, {}); + ASSERT_EQ(txnParticipant.getActiveTxnNumber(), txnNum); + } + private: static HostAndPort makeHostAndPort(const ShardId& shardId) { return HostAndPort(str::stream() << shardId << ":123"); @@ -126,30 +218,18 @@ TEST_F(ReshardingTxnClonerTest, TxnAggregation) { kTwoShardIdList[1], Timestamp::max(), boost::none, - [&](StatusWith statusWithTransaction) { - auto transaction = - unittest::assertGet(statusWithTransaction); + [&](OperationContext* opCtx, BSONObj transaction) { retrievedTransactions.push_back(transaction); - }); + }, + nullptr); fetcher->join(); }); - onCommand([&](const executor::RemoteCommandRequest& request) { - return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace, - CursorId{123}, - std::vector(expectedTransactions.begin(), - expectedTransactions.begin() + 4)) - .toBSON(CursorResponse::ResponseType::InitialResponse); - }); + onCommandReturnTxns( + std::vector(expectedTransactions.begin(), expectedTransactions.begin() + 4), + std::vector(expectedTransactions.begin() + 4, expectedTransactions.end())); - onCommand([&](const executor::RemoteCommandRequest& request) { - return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace, - CursorId{0}, - std::vector(expectedTransactions.begin() + 4, - expectedTransactions.end())) - .toBSON(CursorResponse::ResponseType::SubsequentResponse); - }); future.default_timed_get(); @@ -163,7 +243,6 @@ TEST_F(ReshardingTxnClonerTest, CursorNotFoundError) { std::vector expectedTransactions{ makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn()}; std::vector retrievedTransactions; - int errorsReturned = 0; Status error = Status::OK(); auto future = launchAsync([&, this] { @@ -172,20 +251,14 @@ TEST_F(ReshardingTxnClonerTest, CursorNotFoundError) { kTwoShardIdList[1], Timestamp::max(), boost::none, - [&](StatusWith statusWithTransaction) { - if (statusWithTransaction.isOK()) { - retrievedTransactions.push_back(statusWithTransaction.getValue()); - } else { - errorsReturned++; - error = statusWithTransaction.getStatus(); - } - }); + [&](auto opCtx, BSONObj transaction) { retrievedTransactions.push_back(transaction); }, + &error); fetcher->join(); }); onCommand([&](const executor::RemoteCommandRequest& request) { return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace, - CursorId{124}, + CursorId{123}, expectedTransactions) .toBSON(CursorResponse::ResponseType::InitialResponse); }); @@ -200,9 +273,190 @@ TEST_F(ReshardingTxnClonerTest, CursorNotFoundError) { expectedTransactions.end(), retrievedTransactions.begin(), [](BSONObj a, BSONObj b) { return a.binaryEqual(b); })); - ASSERT_EQ(errorsReturned, 1); ASSERT_EQ(error, ErrorCodes::CursorNotFound); } +TEST_F(ReshardingTxnClonerTest, MergeTxnNotOnRecipient) { + Status status = Status::OK(); + + auto future = launchAsync([&, this] { + auto fetcher = cloneConfigTxnsForResharding(operationContext(), + kTwoShardIdList[1], + Timestamp::max(), + boost::none, + &configTxnsMergerForResharding, + &status); + fetcher->join(); + }); + const auto sessionId = makeLogicalSessionIdForTest(); + TxnNumber txnNum = 3; + + onCommandReturnTxns( + {SessionTxnRecord(sessionId, txnNum, repl::OpTime(), Date_t::now()).toBSON()}, {}); + + future.default_timed_get(); + + ASSERT(status.isOK()); + checkTxnHasBeenUpdated(sessionId, txnNum); +} + +TEST_F(ReshardingTxnClonerTest, MergeUnParsableTxn) { + Status status = Status::OK(); + + auto future = launchAsync([&, this] { + auto fetcher = cloneConfigTxnsForResharding(operationContext(), + kTwoShardIdList[1], + Timestamp::max(), + boost::none, + &configTxnsMergerForResharding, + &status); + fetcher->join(); + }); + const auto sessionId = makeLogicalSessionIdForTest(); + TxnNumber txnNum = 3; + onCommandReturnTxns({SessionTxnRecord(sessionId, txnNum, repl::OpTime(), Date_t::now()) + .toBSON() + .removeField(SessionTxnRecord::kSessionIdFieldName)}, + {}); + + future.default_timed_get(); + + ASSERT_EQ(status.code(), 40414); +} + +TEST_F(ReshardingTxnClonerTest, MergeNewTxnOverMultiDocTxn) { + Status status = Status::OK(); + const auto sessionId = makeLogicalSessionIdForTest(); + TxnNumber donorTxnNum = 3; + TxnNumber recipientTxnNum = donorTxnNum - 1; + + seedTransactionOnRecipient(sessionId, recipientTxnNum, true); + + auto future = launchAsync([&, this] { + auto fetcher = cloneConfigTxnsForResharding(operationContext(), + kTwoShardIdList[1], + Timestamp::max(), + boost::none, + &configTxnsMergerForResharding, + &status); + fetcher->join(); + }); + + onCommandReturnTxns( + {SessionTxnRecord(sessionId, donorTxnNum, repl::OpTime(), Date_t::now()).toBSON()}, {}); + + future.default_timed_get(); + + ASSERT(status.isOK()); + checkTxnHasBeenUpdated(sessionId, donorTxnNum); +} + +TEST_F(ReshardingTxnClonerTest, MergeNewTxnOverRetryableWriteTxn) { + Status status = Status::OK(); + const auto sessionId = makeLogicalSessionIdForTest(); + TxnNumber donorTxnNum = 3; + TxnNumber recipientTxnNum = donorTxnNum - 1; + + seedTransactionOnRecipient(sessionId, recipientTxnNum, false); + + auto future = launchAsync([&, this] { + auto fetcher = cloneConfigTxnsForResharding(operationContext(), + kTwoShardIdList[1], + Timestamp::max(), + boost::none, + &configTxnsMergerForResharding, + &status); + fetcher->join(); + }); + + onCommandReturnTxns( + {SessionTxnRecord(sessionId, donorTxnNum, repl::OpTime(), Date_t::now()).toBSON()}, {}); + + future.default_timed_get(); + + ASSERT(status.isOK()); + checkTxnHasBeenUpdated(sessionId, donorTxnNum); +} + +TEST_F(ReshardingTxnClonerTest, MergeCurrentTxnOverRetryableWriteTxn) { + Status status = Status::OK(); + const auto sessionId = makeLogicalSessionIdForTest(); + TxnNumber txnNum = 3; + + seedTransactionOnRecipient(sessionId, txnNum, false); + + auto future = launchAsync([&, this] { + auto fetcher = cloneConfigTxnsForResharding(operationContext(), + kTwoShardIdList[1], + Timestamp::max(), + boost::none, + &configTxnsMergerForResharding, + &status); + fetcher->join(); + }); + + onCommandReturnTxns( + {SessionTxnRecord(sessionId, txnNum, repl::OpTime(), Date_t::now()).toBSON()}, {}); + + future.default_timed_get(); + + ASSERT(status.isOK()); + checkTxnHasBeenUpdated(sessionId, txnNum); +} + +TEST_F(ReshardingTxnClonerTest, MergeCurrentTxnOverMultiDocTxn) { + Status status = Status::OK(); + const auto sessionId = makeLogicalSessionIdForTest(); + TxnNumber txnNum = 3; + + seedTransactionOnRecipient(sessionId, txnNum, true); + + auto future = launchAsync([&, this] { + auto fetcher = cloneConfigTxnsForResharding(operationContext(), + kTwoShardIdList[1], + Timestamp::max(), + boost::none, + &configTxnsMergerForResharding, + &status); + fetcher->join(); + }); + + onCommandReturnTxns( + {SessionTxnRecord(sessionId, txnNum, repl::OpTime(), Date_t::now()).toBSON()}, {}); + + future.default_timed_get(); + + ASSERT(status.isOK()); + checkTxnHasNotBeenUpdated(sessionId, txnNum); +} + + +TEST_F(ReshardingTxnClonerTest, MergeOldTxnOverTxn) { + Status status = Status::OK(); + const auto sessionId = makeLogicalSessionIdForTest(); + TxnNumber recipientTxnNum = 3; + TxnNumber donorTxnNum = recipientTxnNum - 1; + + seedTransactionOnRecipient(sessionId, recipientTxnNum, false); + + auto future = launchAsync([&, this] { + auto fetcher = cloneConfigTxnsForResharding(operationContext(), + kTwoShardIdList[1], + Timestamp::max(), + boost::none, + &configTxnsMergerForResharding, + &status); + fetcher->join(); + }); + + onCommandReturnTxns( + {SessionTxnRecord(sessionId, donorTxnNum, repl::OpTime(), Date_t::now()).toBSON()}, {}); + + future.default_timed_get(); + + ASSERT(status.isOK()); + checkTxnHasNotBeenUpdated(sessionId, recipientTxnNum); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 56d2018763c..8cc41078863 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -392,24 +392,6 @@ std::unique_ptr createAggForReshardingOplogBuffer( return pipeline; } -std::unique_ptr createConfigTxnCloningPipelineForResharding( - const boost::intrusive_ptr& expCtx, - Timestamp fetchTimestamp, - boost::optional startAfter) { - invariant(!fetchTimestamp.isNull()); - - std::list> stages; - if (startAfter) { - stages.emplace_back(DocumentSourceMatch::create( - BSON("_id" << BSON("$gt" << startAfter->toBSON())), expCtx)); - } - stages.emplace_back(DocumentSourceSort::create(expCtx, BSON("_id" << 1))); - stages.emplace_back(DocumentSourceMatch::create( - BSON("lastWriteOpTime.ts" << BSON("$lt" << fetchTimestamp)), expCtx)); - - return Pipeline::create(std::move(stages), expCtx); -} - void createSlimOplogView(OperationContext* opCtx, Database* db) { writeConflictRetry( opCtx, "createReshardingSlimOplog", "local.system.resharding.slimOplogForGraphLookup", [&] { diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index defc05c1e7f..0ef4d258815 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -160,23 +160,6 @@ std::unique_ptr createAggForReshardingOplogBuffer( const boost::optional& resumeToken, bool doAttachDocumentCursor); -/** - * Create pipeline stages for iterating donor config.transactions. The pipeline has these stages: - * pipeline: [ - * {$match: {_id: {$gt: }}}, - * {$sort: {_id: 1}}, - * {$match: {"lastWriteOpTime.ts": {$lt: }}}, - * ], - * Note that the caller is responsible for making sure that the transactions ns is set in the - * expCtx. - * - * fetchTimestamp never isNull() - */ -std::unique_ptr createConfigTxnCloningPipelineForResharding( - const boost::intrusive_ptr& expCtx, - Timestamp fetchTimestamp, - boost::optional startAfter); - /** * Creates a view on the oplog that facilitates the specialized oplog tailing a resharding * recipient performs on a donor. diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp index 35115010b63..d85a03b942f 100644 --- a/src/mongo/db/s/resharding_util_test.cpp +++ b/src/mongo/db/s/resharding_util_test.cpp @@ -42,6 +42,7 @@ #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/s/config/config_server_test_fixture.h" +#include "mongo/db/s/resharding_txn_cloner.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/s/catalog/type_shard.h" @@ -1098,7 +1099,9 @@ class ReshardingTxnCloningPipelineTest : public AggregationContextFixture { protected: std::pair, std::deque> - makeTransactions(size_t numTransactions, std::function getTimestamp) { + makeTransactions(size_t numTransactions, + std::function getTimestamp, + bool includeMultiDocTransaction = false) { std::deque mockResults; std::deque expectedTransactions; // this will hold the expected result for this test @@ -1108,6 +1111,14 @@ protected: mockResults.emplace_back(Document(transaction.toBSON())); expectedTransactions.emplace_back(transaction); } + if (includeMultiDocTransaction) { + auto transaction = SessionTxnRecord(makeLogicalSessionIdForTest(), + 0, + repl::OpTime(getTimestamp(numTransactions), 0), + Date_t()); + transaction.setState(DurableTxnStateEnum::kInProgress); + mockResults.emplace_back(Document(transaction.toBSON())); + } std::sort(expectedTransactions.begin(), expectedTransactions.end(), [](SessionTxnRecord a, SessionTxnRecord b) { @@ -1191,6 +1202,15 @@ TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineAfterID) { ASSERT(pipelineMatchesDeque(pipeline, expectedTransactions)); } +TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineOnlyRetryableWrites) { + auto [mockResults, expectedTransactions] = + makeTransactions(10, [](size_t) { return Timestamp::min(); }, true); + + auto pipeline = constructPipeline(mockResults, Timestamp::max(), boost::none); + + ASSERT(pipelineMatchesDeque(pipeline, expectedTransactions)); +} + class ReshardingCollectionCloneTest : public AggregationContextFixture { protected: const NamespaceString& sourceNss() { diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 437379cff1d..153a745fc28 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -288,7 +288,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, sessionTxnRecord.setLastWriteOpTime(oplogOpTime); sessionTxnRecord.setLastWriteDate(oplogEntry.getWallClockTime()); // We do not migrate transaction oplog entries so don't set the txn state. - txnParticipant.onMigrateCompletedOnPrimary(opCtx, {stmtId}, sessionTxnRecord); + txnParticipant.onRetryableWriteCloningCompleted(opCtx, {stmtId}, sessionTxnRecord); } wunit.commit(); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index a4c59160c87..0158adeeda5 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -2250,7 +2250,7 @@ void TransactionParticipant::Participant::onWriteOpCompletedOnPrimary( opCtx, std::move(stmtIdsWritten), sessionTxnRecord.getLastWriteOpTime()); } -void TransactionParticipant::Participant::onMigrateCompletedOnPrimary( +void TransactionParticipant::Participant::onRetryableWriteCloningCompleted( OperationContext* opCtx, std::vector stmtIdsWritten, const SessionTxnRecord& sessionTxnRecord) { diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 37b71ce8589..0bda26495db 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -319,6 +319,10 @@ public: return o().txnState.isInProgress(); } + bool transactionIsInRetryableWriteMode() const { + return o().txnState.isInRetryableWriteMode(); + } + /** * If this session is holding stashed locks in txnResourceStash, reports the current state * of the session using the provided builder. @@ -425,13 +429,14 @@ public: TxnNumber txnNumber); /** - * If the participant is in prepare, returns a future whose promise is fulfilled when the - * participant transitions out of prepare. + * If the participant is in prepare, returns a future whose promise is fulfilled when + * the participant transitions out of prepare. * * If the participant is not in prepare, returns an immediately ready future. * - * The caller should not wait on the future with the session checked out, since that will - * prevent the promise from being able to be fulfilled, i.e., will cause a deadlock. + * The caller should not wait on the future with the session checked out, since that + * will prevent the promise from being able to be fulfilled, i.e., will cause a + * deadlock. */ SharedSemiFuture onExitPrepare() const; @@ -554,9 +559,13 @@ public: * Throws if the session has been invalidated or the active transaction number is newer than * the one specified. */ - void onMigrateCompletedOnPrimary(OperationContext* opCtx, - std::vector stmtIdsWritten, - const SessionTxnRecord& sessionTxnRecord); + void onRetryableWriteCloningCompleted(OperationContext* opCtx, + std::vector stmtIdsWritten, + const SessionTxnRecord& sessionTxnRecord); + + void onTxnMigrateCompletedOnPrimary(OperationContext* opCtx, + std::vector stmtIdsWritten, + const SessionTxnRecord& sessionTxnRecord); /** * Checks whether the given statementId for the specified transaction has already executed -- cgit v1.2.1