summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Maynard <eric.maynard@mongodb.com>2020-09-30 18:30:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-13 18:11:36 +0000
commitbbe604a5b3f94f78a30a76941ea89c0ca5db14d2 (patch)
treed1f35a44964e80541b02f1cc3ea18375be28b512
parentfb1675676670ad763a1e55ff685e346dc2cc6975 (diff)
downloadmongo-bbe604a5b3f94f78a30a76941ea89c0ca5db14d2.tar.gz
SERVER-49899 Create config.transactions cloner for resharding (merges txns)
-rw-r--r--src/mongo/db/s/SConscript29
-rw-r--r--src/mongo/db/s/resharding_txn_cloner.cpp171
-rw-r--r--src/mongo/db/s/resharding_txn_cloner.h26
-rw-r--r--src/mongo/db/s/resharding_txn_cloner_test.cpp312
-rw-r--r--src/mongo/db/s/resharding_util.cpp18
-rw-r--r--src/mongo/db/s/resharding_util.h17
-rw-r--r--src/mongo/db/s/resharding_util_test.cpp22
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp2
-rw-r--r--src/mongo/db/transaction_participant.cpp2
-rw-r--r--src/mongo/db/transaction_participant.h23
10 files changed, 523 insertions, 99 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index bcd10878025..ddbc7ab1f0b 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -126,6 +126,7 @@ env.Library(
'$BUILD_DIR/mongo/db/session_catalog',
'$BUILD_DIR/mongo/idl/server_parameter',
'$BUILD_DIR/mongo/util/future_util',
+ 'resharding_txn_cloner',
'resharding_util',
],
)
@@ -133,7 +134,6 @@ env.Library(
env.Library(
target='resharding_util',
source=[
- 'resharding_txn_cloner.cpp',
'resharding_util.cpp',
env.Idlc('resharding/coordinator_document.idl')[0],
env.Idlc('resharding/donor_document.idl')[0],
@@ -153,6 +153,30 @@ env.Library(
'$BUILD_DIR/mongo/s/common_s',
'$BUILD_DIR/mongo/s/grid',
'sharding_api_d',
+
+ ],
+)
+
+env.Library(
+ target='resharding_txn_cloner',
+ source=[
+ 'resharding_txn_cloner.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/common',
+ '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
+ '$BUILD_DIR/mongo/db/curop',
+ '$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/db/pipeline/expression_context',
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
+ '$BUILD_DIR/mongo/db/repl/oplog',
+ '$BUILD_DIR/mongo/db/storage/write_unit_of_work',
+ '$BUILD_DIR/mongo/db/transaction',
+ '$BUILD_DIR/mongo/s/async_requests_sender',
+ '$BUILD_DIR/mongo/s/common_s',
+ '$BUILD_DIR/mongo/s/grid',
+ 'sharding_api_d',
],
)
@@ -356,6 +380,7 @@ env.Library(
'$BUILD_DIR/mongo/s/sharding_initialization',
'$BUILD_DIR/mongo/s/sharding_router_api',
'balancer',
+ 'resharding_txn_cloner',
'resharding_util',
'sharding_runtime_d',
],
@@ -466,6 +491,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
+ 'resharding_txn_cloner',
'resharding_util',
'shard_server_test_fixture',
'sharding_logging',
@@ -559,6 +585,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/util/version_impl',
'balancer',
'config_server_test_fixture',
+ 'resharding_txn_cloner',
'resharding_util',
],
)
diff --git a/src/mongo/db/s/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding_txn_cloner.cpp
index f5e4721d3b4..c50e4806039 100644
--- a/src/mongo/db/s/resharding_txn_cloner.cpp
+++ b/src/mongo/db/s/resharding_txn_cloner.cpp
@@ -41,22 +41,58 @@
#include "mongo/client/fetcher.h"
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
-#include "mongo/db/query/query_request.h"
-#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/logv2/redaction.h"
#include "mongo/s/shard_id.h"
namespace mongo {
using namespace fmt::literals;
+
+std::unique_ptr<Pipeline, PipelineDeleter> createConfigTxnCloningPipelineForResharding(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Timestamp fetchTimestamp,
+ boost::optional<LogicalSessionId> startAfter) {
+ invariant(!fetchTimestamp.isNull());
+
+ std::list<boost::intrusive_ptr<DocumentSource>> stages;
+ if (startAfter) {
+ stages.emplace_back(DocumentSourceMatch::create(BSON(SessionTxnRecord::kSessionIdFieldName
+ << BSON("$gt" << startAfter->toBSON())
+ << SessionTxnRecord::kStateFieldName
+ << BSON("$exists" << false)),
+ expCtx));
+ } else {
+ stages.emplace_back(DocumentSourceMatch::create(
+ BSON(SessionTxnRecord::kStateFieldName << BSON("$exists" << false)), expCtx));
+ }
+ stages.emplace_back(
+ DocumentSourceSort::create(expCtx, BSON(SessionTxnRecord::kSessionIdFieldName << 1)));
+ stages.emplace_back(DocumentSourceMatch::create(
+ BSON((SessionTxnRecord::kLastWriteOpTimeFieldName + "." + repl::OpTime::kTimestampFieldName)
+ << BSON("$lt" << fetchTimestamp)),
+ expCtx));
+
+ return Pipeline::create(std::move(stages), expCtx);
+}
+
+
std::unique_ptr<Fetcher> cloneConfigTxnsForResharding(
OperationContext* opCtx,
const ShardId& shardId,
Timestamp fetchTimestamp,
boost::optional<LogicalSessionId> startAfter,
- std::function<void(StatusWith<BSONObj>)> merge) {
+ std::function<void(OperationContext*, BSONObj)> merge,
+ Status* status) {
+
boost::intrusive_ptr<ExpressionContext> expCtx = make_intrusive<ExpressionContext>(
opCtx, nullptr, NamespaceString::kSessionTransactionsTableNamespace);
auto pipeline =
@@ -73,26 +109,35 @@ std::unique_ptr<Fetcher> cloneConfigTxnsForResharding(
auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
const auto targetHost = uassertStatusOK(
shard->getTargeter()->findHost(opCtx, ReadPreferenceSetting{ReadPreference::Nearest}));
-
- auto fetcherCallback = [merge](const Fetcher::QueryResponseStatus& dataStatus,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- if (!dataStatus.isOK()) {
- merge(dataStatus.getStatus());
- return;
- }
-
- auto data = dataStatus.getValue();
- for (BSONObj doc : data.documents) {
- merge(doc);
- }
-
- if (!getMoreBob) {
- return;
- }
- getMoreBob->append("getMore", data.cursorId);
- getMoreBob->append("collection", data.nss.coll());
- };
+ auto serviceContext = opCtx->getServiceContext();
+ auto fetcherCallback =
+ [merge, status, serviceContext](const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ if (!dataStatus.isOK()) {
+ *status = dataStatus.getStatus();
+ return;
+ }
+
+ ThreadClient threadClient(serviceContext);
+ auto uniqueOpCtx = cc().makeOperationContext();
+ auto fetcherOpCtx = uniqueOpCtx.get();
+ auto data = dataStatus.getValue();
+ for (BSONObj doc : data.documents) {
+ try {
+ merge(fetcherOpCtx, doc);
+ } catch (const DBException& ex) {
+ *status = ex.toStatus();
+ return;
+ }
+ }
+
+ if (!getMoreBob) {
+ return;
+ }
+ getMoreBob->append("getMore", data.cursorId);
+ getMoreBob->append("collection", data.nss.coll());
+ };
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
@@ -107,4 +152,84 @@ std::unique_ptr<Fetcher> cloneConfigTxnsForResharding(
return fetcher;
}
+void configTxnsMergerForResharding(OperationContext* opCtx, BSONObj donorBsonTransaction) {
+ SessionTxnRecord donorTransaction;
+
+ donorTransaction = SessionTxnRecord::parse(
+ IDLParserErrorContext("resharding config transactions cloning"), donorBsonTransaction);
+
+ opCtx->setLogicalSessionId(donorTransaction.getSessionId());
+
+ while (true) {
+ auto ocs = std::make_unique<MongoDOperationContextSession>(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ // Which error code should this be? what message?
+ uassert(4989900, "Failed to get transaction Participant", txnParticipant);
+ try {
+ txnParticipant.beginOrContinue(
+ opCtx, donorTransaction.getTxnNum(), boost::none, boost::none);
+ } catch (const DBException& ex) {
+ if (ex.code() == ErrorCodes::TransactionTooOld) {
+ // donorTransaction.getTxnNum() < recipientTxnNumber
+ return;
+ } else if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
+ // donorTransaction.getTxnNum() == recipientTxnNumber &&
+ // !txnParticipant.transactionIsInRetryableWriteMode()
+ return;
+ } else if (ex.code() == ErrorCodes::PreparedTransactionInProgress) {
+ // txnParticipant.transactionIsPrepared()
+ ocs.reset();
+ // TODO SERVER-51493 Change to not block here.
+ txnParticipant.onExitPrepare().wait();
+ continue;
+ }
+ throw;
+ }
+
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setObject(BSON("$sessionMigrateInfo" << 1));
+ oplogEntry.setObject2(BSON("$incompleteOplogHistory" << 1));
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setSessionId(donorTransaction.getSessionId());
+ oplogEntry.setTxnNumber(donorTransaction.getTxnNum());
+ oplogEntry.setStatementId(kIncompleteHistoryStmtId);
+ oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
+ oplogEntry.setNss({});
+ oplogEntry.setWallClockTime(Date_t::now());
+
+ writeConflictRetry(
+ opCtx,
+ "ReshardingTxnCloner",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ // Need to take global lock here so repl::logOp will not unlock it and
+ // trigger the invariant that disallows unlocking global lock while
+ // inside a WUOW. Take the transaction table db lock to ensure the same
+ // lock ordering with normal replicated updates to the table.
+ Lock::DBLock lk(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
+ WriteUnitOfWork wunit(opCtx);
+
+ repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry);
+
+ uassert(4989901,
+ str::stream() << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString() << ": "
+ << redact(oplogEntry.toBSON()),
+ !opTime.isNull());
+
+ SessionTxnRecord sessionTxnRecord(donorTransaction.getSessionId(),
+ donorTransaction.getTxnNum(),
+ opTime,
+ Date_t::now());
+ txnParticipant.onRetryableWriteCloningCompleted(
+ opCtx, {kIncompleteHistoryStmtId}, sessionTxnRecord);
+ wunit.commit();
+ });
+
+ return;
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding_txn_cloner.h b/src/mongo/db/s/resharding_txn_cloner.h
index e7fe58360ef..497d9d8f810 100644
--- a/src/mongo/db/s/resharding_txn_cloner.h
+++ b/src/mongo/db/s/resharding_txn_cloner.h
@@ -37,6 +37,24 @@
namespace mongo {
+
+/**
+ * Create pipeline stages for iterating donor config.transactions. The pipeline has these stages:
+ * pipeline: [
+ * {$match: {_id: {$gt: <startAfter>}, state: {$exists: false}}},
+ * {$sort: {_id: 1}},
+ * {$match: {"lastWriteOpTime.ts": {$lt: <fetchTimestamp>}}},
+ * ],
+ * Note that the caller is responsible for making sure that the transactions ns is set in the
+ * expCtx.
+ *
+ * fetchTimestamp never isNull()
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> createConfigTxnCloningPipelineForResharding(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Timestamp fetchTimestamp,
+ boost::optional<LogicalSessionId> startAfter);
+
/**
* Clone config.transactions from source and updates the config.transactions on itself.
* The parameter merge is a function called on every transaction received and should be used
@@ -49,6 +67,12 @@ std::unique_ptr<Fetcher> cloneConfigTxnsForResharding(
const ShardId& shardId,
Timestamp fetchTimestamp,
boost::optional<LogicalSessionId> startAfter,
- std::function<void(StatusWith<BSONObj>)> merge);
+ std::function<void(OperationContext*, BSONObj)> merge,
+ Status* status);
+
+/**
+ * Callback function to be used to merge transactions cloned by cloneConfigTxnsForResharding
+ */
+void configTxnsMergerForResharding(OperationContext* opCtx, BSONObj donorBsonTransaction);
} // namespace mongo
diff --git a/src/mongo/db/s/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding_txn_cloner_test.cpp
index c5993ee3582..9277931b8d2 100644
--- a/src/mongo/db/s/resharding_txn_cloner_test.cpp
+++ b/src/mongo/db/s/resharding_txn_cloner_test.cpp
@@ -34,11 +34,16 @@
#include <vector>
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_cache_noop.h"
#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/resharding_txn_cloner.h"
#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/unittest/unittest.h"
@@ -57,6 +62,12 @@ class ReshardingTxnClonerTest : public ShardServerTestFixture {
}
WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+
+ // onStepUp() relies on the storage interface to create the config.transactions table.
+ repl::StorageInterface::set(getServiceContext(),
+ std::make_unique<repl::StorageInterfaceImpl>());
+ MongoDSessionCatalog::onStepUp(operationContext());
+ LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
}
void tearDown() {
@@ -109,6 +120,87 @@ protected:
;
}
+ void onCommandReturnTxns(std::vector<BSONObj> firstBatch, std::vector<BSONObj> secondBatch) {
+ CursorId cursorId(0);
+ if (secondBatch.size() > 0) {
+ cursorId = 123;
+ }
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(
+ NamespaceString::kSessionTransactionsTableNamespace, cursorId, firstBatch)
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ if (secondBatch.size() == 0) {
+ return;
+ }
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace,
+ CursorId{0},
+ secondBatch)
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+ }
+
+ void seedTransactionOnRecipient(LogicalSessionId sessionId,
+ TxnNumber txnNum,
+ bool multiDocTxn) {
+ auto opCtx = operationContext();
+ opCtx->setLogicalSessionId(sessionId);
+ opCtx->setTxnNumber(txnNum);
+
+ if (multiDocTxn) {
+ opCtx->setInMultiDocumentTransaction();
+ }
+
+ MongoDOperationContextSession ocs(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ ASSERT(txnParticipant);
+ if (multiDocTxn) {
+ txnParticipant.beginOrContinue(opCtx, txnNum, false, true);
+ } else {
+ txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none);
+ }
+ }
+
+ void checkTxnHasBeenUpdated(LogicalSessionId sessionId, TxnNumber txnNum) {
+ DBDirectClient client(operationContext());
+ auto bsonOplog =
+ client.findOne(NamespaceString::kRsOplogNamespace.ns(),
+ BSON(repl::OplogEntryBase::kSessionIdFieldName << sessionId.toBSON()));
+ ASSERT(!bsonOplog.isEmpty());
+ auto oplogEntry = repl::MutableOplogEntry::parse(bsonOplog).getValue();
+ ASSERT_EQ(oplogEntry.getTxnNumber().get(), txnNum);
+ ASSERT_BSONOBJ_EQ(oplogEntry.getObject(), BSON("$sessionMigrateInfo" << 1));
+ ASSERT_BSONOBJ_EQ(oplogEntry.getObject2().get(), BSON("$incompleteOplogHistory" << 1));
+ ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kNoop);
+
+ auto bsonTxn =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON())});
+ ASSERT(!bsonTxn.isEmpty());
+ auto txn = SessionTxnRecord::parse(
+ IDLParserErrorContext("resharding config transactions cloning test"), bsonTxn);
+ ASSERT_EQ(txn.getTxnNum(), txnNum);
+ ASSERT_EQ(txn.getLastWriteOpTime(), oplogEntry.getOpTime());
+ }
+ void checkTxnHasNotBeenUpdated(LogicalSessionId sessionId, TxnNumber txnNum) {
+ auto opCtx = operationContext();
+ opCtx->setLogicalSessionId(sessionId);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
+ DBDirectClient client(operationContext());
+ auto bsonOplog =
+ client.findOne(NamespaceString::kRsOplogNamespace.ns(),
+ BSON(repl::OplogEntryBase::kSessionIdFieldName << sessionId.toBSON()));
+
+ ASSERT_BSONOBJ_EQ(bsonOplog, {});
+ ASSERT_EQ(txnParticipant.getActiveTxnNumber(), txnNum);
+ }
+
private:
static HostAndPort makeHostAndPort(const ShardId& shardId) {
return HostAndPort(str::stream() << shardId << ":123");
@@ -126,30 +218,18 @@ TEST_F(ReshardingTxnClonerTest, TxnAggregation) {
kTwoShardIdList[1],
Timestamp::max(),
boost::none,
- [&](StatusWith<BSONObj> statusWithTransaction) {
- auto transaction =
- unittest::assertGet(statusWithTransaction);
+ [&](OperationContext* opCtx, BSONObj transaction) {
retrievedTransactions.push_back(transaction);
- });
+ },
+ nullptr);
fetcher->join();
});
- onCommand([&](const executor::RemoteCommandRequest& request) {
- return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace,
- CursorId{123},
- std::vector<BSONObj>(expectedTransactions.begin(),
- expectedTransactions.begin() + 4))
- .toBSON(CursorResponse::ResponseType::InitialResponse);
- });
+ onCommandReturnTxns(
+ std::vector<BSONObj>(expectedTransactions.begin(), expectedTransactions.begin() + 4),
+ std::vector<BSONObj>(expectedTransactions.begin() + 4, expectedTransactions.end()));
- onCommand([&](const executor::RemoteCommandRequest& request) {
- return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace,
- CursorId{0},
- std::vector<BSONObj>(expectedTransactions.begin() + 4,
- expectedTransactions.end()))
- .toBSON(CursorResponse::ResponseType::SubsequentResponse);
- });
future.default_timed_get();
@@ -163,7 +243,6 @@ TEST_F(ReshardingTxnClonerTest, CursorNotFoundError) {
std::vector<BSONObj> expectedTransactions{
makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn()};
std::vector<BSONObj> retrievedTransactions;
- int errorsReturned = 0;
Status error = Status::OK();
auto future = launchAsync([&, this] {
@@ -172,20 +251,14 @@ TEST_F(ReshardingTxnClonerTest, CursorNotFoundError) {
kTwoShardIdList[1],
Timestamp::max(),
boost::none,
- [&](StatusWith<BSONObj> statusWithTransaction) {
- if (statusWithTransaction.isOK()) {
- retrievedTransactions.push_back(statusWithTransaction.getValue());
- } else {
- errorsReturned++;
- error = statusWithTransaction.getStatus();
- }
- });
+ [&](auto opCtx, BSONObj transaction) { retrievedTransactions.push_back(transaction); },
+ &error);
fetcher->join();
});
onCommand([&](const executor::RemoteCommandRequest& request) {
return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace,
- CursorId{124},
+ CursorId{123},
expectedTransactions)
.toBSON(CursorResponse::ResponseType::InitialResponse);
});
@@ -200,9 +273,190 @@ TEST_F(ReshardingTxnClonerTest, CursorNotFoundError) {
expectedTransactions.end(),
retrievedTransactions.begin(),
[](BSONObj a, BSONObj b) { return a.binaryEqual(b); }));
- ASSERT_EQ(errorsReturned, 1);
ASSERT_EQ(error, ErrorCodes::CursorNotFound);
}
+TEST_F(ReshardingTxnClonerTest, MergeTxnNotOnRecipient) {
+ Status status = Status::OK();
+
+ auto future = launchAsync([&, this] {
+ auto fetcher = cloneConfigTxnsForResharding(operationContext(),
+ kTwoShardIdList[1],
+ Timestamp::max(),
+ boost::none,
+ &configTxnsMergerForResharding,
+ &status);
+ fetcher->join();
+ });
+ const auto sessionId = makeLogicalSessionIdForTest();
+ TxnNumber txnNum = 3;
+
+ onCommandReturnTxns(
+ {SessionTxnRecord(sessionId, txnNum, repl::OpTime(), Date_t::now()).toBSON()}, {});
+
+ future.default_timed_get();
+
+ ASSERT(status.isOK());
+ checkTxnHasBeenUpdated(sessionId, txnNum);
+}
+
+TEST_F(ReshardingTxnClonerTest, MergeUnParsableTxn) {
+ Status status = Status::OK();
+
+ auto future = launchAsync([&, this] {
+ auto fetcher = cloneConfigTxnsForResharding(operationContext(),
+ kTwoShardIdList[1],
+ Timestamp::max(),
+ boost::none,
+ &configTxnsMergerForResharding,
+ &status);
+ fetcher->join();
+ });
+ const auto sessionId = makeLogicalSessionIdForTest();
+ TxnNumber txnNum = 3;
+ onCommandReturnTxns({SessionTxnRecord(sessionId, txnNum, repl::OpTime(), Date_t::now())
+ .toBSON()
+ .removeField(SessionTxnRecord::kSessionIdFieldName)},
+ {});
+
+ future.default_timed_get();
+
+ ASSERT_EQ(status.code(), 40414);
+}
+
+TEST_F(ReshardingTxnClonerTest, MergeNewTxnOverMultiDocTxn) {
+ Status status = Status::OK();
+ const auto sessionId = makeLogicalSessionIdForTest();
+ TxnNumber donorTxnNum = 3;
+ TxnNumber recipientTxnNum = donorTxnNum - 1;
+
+ seedTransactionOnRecipient(sessionId, recipientTxnNum, true);
+
+ auto future = launchAsync([&, this] {
+ auto fetcher = cloneConfigTxnsForResharding(operationContext(),
+ kTwoShardIdList[1],
+ Timestamp::max(),
+ boost::none,
+ &configTxnsMergerForResharding,
+ &status);
+ fetcher->join();
+ });
+
+ onCommandReturnTxns(
+ {SessionTxnRecord(sessionId, donorTxnNum, repl::OpTime(), Date_t::now()).toBSON()}, {});
+
+ future.default_timed_get();
+
+ ASSERT(status.isOK());
+ checkTxnHasBeenUpdated(sessionId, donorTxnNum);
+}
+
+TEST_F(ReshardingTxnClonerTest, MergeNewTxnOverRetryableWriteTxn) {
+ Status status = Status::OK();
+ const auto sessionId = makeLogicalSessionIdForTest();
+ TxnNumber donorTxnNum = 3;
+ TxnNumber recipientTxnNum = donorTxnNum - 1;
+
+ seedTransactionOnRecipient(sessionId, recipientTxnNum, false);
+
+ auto future = launchAsync([&, this] {
+ auto fetcher = cloneConfigTxnsForResharding(operationContext(),
+ kTwoShardIdList[1],
+ Timestamp::max(),
+ boost::none,
+ &configTxnsMergerForResharding,
+ &status);
+ fetcher->join();
+ });
+
+ onCommandReturnTxns(
+ {SessionTxnRecord(sessionId, donorTxnNum, repl::OpTime(), Date_t::now()).toBSON()}, {});
+
+ future.default_timed_get();
+
+ ASSERT(status.isOK());
+ checkTxnHasBeenUpdated(sessionId, donorTxnNum);
+}
+
+TEST_F(ReshardingTxnClonerTest, MergeCurrentTxnOverRetryableWriteTxn) {
+ Status status = Status::OK();
+ const auto sessionId = makeLogicalSessionIdForTest();
+ TxnNumber txnNum = 3;
+
+ seedTransactionOnRecipient(sessionId, txnNum, false);
+
+ auto future = launchAsync([&, this] {
+ auto fetcher = cloneConfigTxnsForResharding(operationContext(),
+ kTwoShardIdList[1],
+ Timestamp::max(),
+ boost::none,
+ &configTxnsMergerForResharding,
+ &status);
+ fetcher->join();
+ });
+
+ onCommandReturnTxns(
+ {SessionTxnRecord(sessionId, txnNum, repl::OpTime(), Date_t::now()).toBSON()}, {});
+
+ future.default_timed_get();
+
+ ASSERT(status.isOK());
+ checkTxnHasBeenUpdated(sessionId, txnNum);
+}
+
+TEST_F(ReshardingTxnClonerTest, MergeCurrentTxnOverMultiDocTxn) {
+ Status status = Status::OK();
+ const auto sessionId = makeLogicalSessionIdForTest();
+ TxnNumber txnNum = 3;
+
+ seedTransactionOnRecipient(sessionId, txnNum, true);
+
+ auto future = launchAsync([&, this] {
+ auto fetcher = cloneConfigTxnsForResharding(operationContext(),
+ kTwoShardIdList[1],
+ Timestamp::max(),
+ boost::none,
+ &configTxnsMergerForResharding,
+ &status);
+ fetcher->join();
+ });
+
+ onCommandReturnTxns(
+ {SessionTxnRecord(sessionId, txnNum, repl::OpTime(), Date_t::now()).toBSON()}, {});
+
+ future.default_timed_get();
+
+ ASSERT(status.isOK());
+ checkTxnHasNotBeenUpdated(sessionId, txnNum);
+}
+
+
+TEST_F(ReshardingTxnClonerTest, MergeOldTxnOverTxn) {
+ Status status = Status::OK();
+ const auto sessionId = makeLogicalSessionIdForTest();
+ TxnNumber recipientTxnNum = 3;
+ TxnNumber donorTxnNum = recipientTxnNum - 1;
+
+ seedTransactionOnRecipient(sessionId, recipientTxnNum, false);
+
+ auto future = launchAsync([&, this] {
+ auto fetcher = cloneConfigTxnsForResharding(operationContext(),
+ kTwoShardIdList[1],
+ Timestamp::max(),
+ boost::none,
+ &configTxnsMergerForResharding,
+ &status);
+ fetcher->join();
+ });
+
+ onCommandReturnTxns(
+ {SessionTxnRecord(sessionId, donorTxnNum, repl::OpTime(), Date_t::now()).toBSON()}, {});
+
+ future.default_timed_get();
+
+ ASSERT(status.isOK());
+ checkTxnHasNotBeenUpdated(sessionId, recipientTxnNum);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 56d2018763c..8cc41078863 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -392,24 +392,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer(
return pipeline;
}
-std::unique_ptr<Pipeline, PipelineDeleter> createConfigTxnCloningPipelineForResharding(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Timestamp fetchTimestamp,
- boost::optional<LogicalSessionId> startAfter) {
- invariant(!fetchTimestamp.isNull());
-
- std::list<boost::intrusive_ptr<DocumentSource>> stages;
- if (startAfter) {
- stages.emplace_back(DocumentSourceMatch::create(
- BSON("_id" << BSON("$gt" << startAfter->toBSON())), expCtx));
- }
- stages.emplace_back(DocumentSourceSort::create(expCtx, BSON("_id" << 1)));
- stages.emplace_back(DocumentSourceMatch::create(
- BSON("lastWriteOpTime.ts" << BSON("$lt" << fetchTimestamp)), expCtx));
-
- return Pipeline::create(std::move(stages), expCtx);
-}
-
void createSlimOplogView(OperationContext* opCtx, Database* db) {
writeConflictRetry(
opCtx, "createReshardingSlimOplog", "local.system.resharding.slimOplogForGraphLookup", [&] {
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index defc05c1e7f..0ef4d258815 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -161,23 +161,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer(
bool doAttachDocumentCursor);
/**
- * Create pipeline stages for iterating donor config.transactions. The pipeline has these stages:
- * pipeline: [
- * {$match: {_id: {$gt: <startAfter>}}},
- * {$sort: {_id: 1}},
- * {$match: {"lastWriteOpTime.ts": {$lt: <fetchTimestamp>}}},
- * ],
- * Note that the caller is responsible for making sure that the transactions ns is set in the
- * expCtx.
- *
- * fetchTimestamp never isNull()
- */
-std::unique_ptr<Pipeline, PipelineDeleter> createConfigTxnCloningPipelineForResharding(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Timestamp fetchTimestamp,
- boost::optional<LogicalSessionId> startAfter);
-
-/**
* Creates a view on the oplog that facilitates the specialized oplog tailing a resharding
* recipient performs on a donor.
*/
diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp
index 35115010b63..d85a03b942f 100644
--- a/src/mongo/db/s/resharding_util_test.cpp
+++ b/src/mongo/db/s/resharding_util_test.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/config/config_server_test_fixture.h"
+#include "mongo/db/s/resharding_txn_cloner.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/s/catalog/type_shard.h"
@@ -1098,7 +1099,9 @@ class ReshardingTxnCloningPipelineTest : public AggregationContextFixture {
protected:
std::pair<std::deque<DocumentSource::GetNextResult>, std::deque<SessionTxnRecord>>
- makeTransactions(size_t numTransactions, std::function<Timestamp(size_t)> getTimestamp) {
+ makeTransactions(size_t numTransactions,
+ std::function<Timestamp(size_t)> getTimestamp,
+ bool includeMultiDocTransaction = false) {
std::deque<DocumentSource::GetNextResult> mockResults;
std::deque<SessionTxnRecord>
expectedTransactions; // this will hold the expected result for this test
@@ -1108,6 +1111,14 @@ protected:
mockResults.emplace_back(Document(transaction.toBSON()));
expectedTransactions.emplace_back(transaction);
}
+ if (includeMultiDocTransaction) {
+ auto transaction = SessionTxnRecord(makeLogicalSessionIdForTest(),
+ 0,
+ repl::OpTime(getTimestamp(numTransactions), 0),
+ Date_t());
+ transaction.setState(DurableTxnStateEnum::kInProgress);
+ mockResults.emplace_back(Document(transaction.toBSON()));
+ }
std::sort(expectedTransactions.begin(),
expectedTransactions.end(),
[](SessionTxnRecord a, SessionTxnRecord b) {
@@ -1191,6 +1202,15 @@ TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineAfterID) {
ASSERT(pipelineMatchesDeque(pipeline, expectedTransactions));
}
+TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineOnlyRetryableWrites) {
+ auto [mockResults, expectedTransactions] =
+ makeTransactions(10, [](size_t) { return Timestamp::min(); }, true);
+
+ auto pipeline = constructPipeline(mockResults, Timestamp::max(), boost::none);
+
+ ASSERT(pipelineMatchesDeque(pipeline, expectedTransactions));
+}
+
class ReshardingCollectionCloneTest : public AggregationContextFixture {
protected:
const NamespaceString& sourceNss() {
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 437379cff1d..153a745fc28 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -288,7 +288,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
sessionTxnRecord.setLastWriteOpTime(oplogOpTime);
sessionTxnRecord.setLastWriteDate(oplogEntry.getWallClockTime());
// We do not migrate transaction oplog entries so don't set the txn state.
- txnParticipant.onMigrateCompletedOnPrimary(opCtx, {stmtId}, sessionTxnRecord);
+ txnParticipant.onRetryableWriteCloningCompleted(opCtx, {stmtId}, sessionTxnRecord);
}
wunit.commit();
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index a4c59160c87..0158adeeda5 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -2250,7 +2250,7 @@ void TransactionParticipant::Participant::onWriteOpCompletedOnPrimary(
opCtx, std::move(stmtIdsWritten), sessionTxnRecord.getLastWriteOpTime());
}
-void TransactionParticipant::Participant::onMigrateCompletedOnPrimary(
+void TransactionParticipant::Participant::onRetryableWriteCloningCompleted(
OperationContext* opCtx,
std::vector<StmtId> stmtIdsWritten,
const SessionTxnRecord& sessionTxnRecord) {
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 37b71ce8589..0bda26495db 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -319,6 +319,10 @@ public:
return o().txnState.isInProgress();
}
+ bool transactionIsInRetryableWriteMode() const {
+ return o().txnState.isInRetryableWriteMode();
+ }
+
/**
* If this session is holding stashed locks in txnResourceStash, reports the current state
* of the session using the provided builder.
@@ -425,13 +429,14 @@ public:
TxnNumber txnNumber);
/**
- * If the participant is in prepare, returns a future whose promise is fulfilled when the
- * participant transitions out of prepare.
+ * If the participant is in prepare, returns a future whose promise is fulfilled when
+ * the participant transitions out of prepare.
*
* If the participant is not in prepare, returns an immediately ready future.
*
- * The caller should not wait on the future with the session checked out, since that will
- * prevent the promise from being able to be fulfilled, i.e., will cause a deadlock.
+ * The caller should not wait on the future with the session checked out, since that
+ * will prevent the promise from being able to be fulfilled, i.e., will cause a
+ * deadlock.
*/
SharedSemiFuture<void> onExitPrepare() const;
@@ -554,9 +559,13 @@ public:
* Throws if the session has been invalidated or the active transaction number is newer than
* the one specified.
*/
- void onMigrateCompletedOnPrimary(OperationContext* opCtx,
- std::vector<StmtId> stmtIdsWritten,
- const SessionTxnRecord& sessionTxnRecord);
+ void onRetryableWriteCloningCompleted(OperationContext* opCtx,
+ std::vector<StmtId> stmtIdsWritten,
+ const SessionTxnRecord& sessionTxnRecord);
+
+ void onTxnMigrateCompletedOnPrimary(OperationContext* opCtx,
+ std::vector<StmtId> stmtIdsWritten,
+ const SessionTxnRecord& sessionTxnRecord);
/**
* Checks whether the given statementId for the specified transaction has already executed