summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2018-12-05 16:04:25 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2018-12-06 11:04:15 -0500
commit2e6e75cc78113485c6c82e695bccd91f7c3932e1 (patch)
tree24f8e6cfec854a10f2a4e229efe9afab78547019 /src/mongo/db
parente7302a09ff2b6a1906ccff76121f6eb7f8773c37 (diff)
downloadmongo-2e6e75cc78113485c6c82e695bccd91f7c3932e1.tar.gz
SERVER-37884 Coordinator should make its state durable before sending prepare and before sending the decision
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript4
-rw-r--r--src/mongo/db/namespace_string.cpp7
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/transaction_coordinator.cpp91
-rw-r--r--src/mongo/db/transaction_coordinator.h13
-rw-r--r--src/mongo/db/transaction_coordinator_document.idl (renamed from src/mongo/db/transaction_commit_decision.idl)19
-rw-r--r--src/mongo/db/transaction_coordinator_test.cpp19
-rw-r--r--src/mongo/db/transaction_coordinator_util.cpp320
-rw-r--r--src/mongo/db/transaction_coordinator_util.h68
-rw-r--r--src/mongo/db/transaction_coordinator_util_test.cpp349
-rw-r--r--src/mongo/db/write_concern_options.cpp3
11 files changed, 800 insertions, 96 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index f3869401905..2bc852aa9a0 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -648,10 +648,12 @@ env.Library(
'transaction_coordinator_factory_mongod.cpp',
'transaction_coordinator_service.cpp',
'transaction_coordinator_util.cpp',
+ env.Idlc('transaction_coordinator_document.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'catalog_raii',
+ 'rw_concern_d',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
@@ -689,7 +691,6 @@ env.Library(
'transaction_participant.cpp',
's/recover_transaction_decision_from_local_participant.cpp',
env.Idlc('session_txn_record.idl')[0],
- env.Idlc('transaction_commit_decision.idl')[0],
env.Idlc('transactions_stats.idl')[0],
],
LIBDEPS=[
@@ -727,6 +728,7 @@ env.CppUnitTest(
'transaction_coordinator_catalog_test.cpp',
'transaction_coordinator_service_test.cpp',
'transaction_coordinator_test.cpp',
+ 'transaction_coordinator_util_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 85b28afd6e8..684bb917798 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -58,8 +58,15 @@ const NamespaceString NamespaceString::kServerConfigurationNamespace(NamespaceSt
"system.version");
const NamespaceString NamespaceString::kLogicalSessionsNamespace(NamespaceString::kConfigDb,
"system.sessions");
+
+// Persisted state for a shard participating in a transaction or retryable write.
const NamespaceString NamespaceString::kSessionTransactionsTableNamespace(
NamespaceString::kConfigDb, "transactions");
+
+// Persisted state for a shard coordinating a cross-shard transaction.
+const NamespaceString NamespaceString::kTransactionCoordinatorsNamespace(
+ NamespaceString::kConfigDb, "transaction_coordinators");
+
const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(NamespaceString::kConfigDb,
"cache.collections");
const NamespaceString NamespaceString::kShardConfigDatabasesNamespace(NamespaceString::kConfigDb,
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index c56898a53b8..651d769544b 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -93,6 +93,9 @@ public:
// Namespace of the the oplog collection.
static const NamespaceString kRsOplogNamespace;
+ // Namespace for storing the persisted state of transaction coordinators.
+ static const NamespaceString kTransactionCoordinatorsNamespace;
+
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp
index 459a8cc7497..5735554ebbc 100644
--- a/src/mongo/db/transaction_coordinator.cpp
+++ b/src/mongo/db/transaction_coordinator.cpp
@@ -34,11 +34,39 @@
#include "mongo/db/transaction_coordinator.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/transaction_coordinator_util.h"
#include "mongo/util/log.h"
namespace mongo {
+namespace {
+
+txn::CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(
+ const txn::PrepareVoteConsensus& result,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber) {
+ invariant(result.decision);
+ txn::CoordinatorCommitDecision decision{result.decision.get(), boost::none};
+
+ if (result.decision == TransactionCoordinator::CommitDecision::kCommit) {
+ invariant(result.maxPrepareTimestamp);
+
+ decision.commitTimestamp = Timestamp(result.maxPrepareTimestamp->getSecs(),
+ result.maxPrepareTimestamp->getInc() + 1);
+
+ LOG(3) << "Advancing cluster time to commit Timestamp " << decision.commitTimestamp.get()
+ << " of transaction " << txnNumber << " on session " << lsid.toBSON();
+
+ uassertStatusOK(LogicalClock::get(getGlobalServiceContext())
+ ->advanceClusterTime(LogicalTime(result.maxPrepareTimestamp.get())));
+ }
+
+ return decision;
+}
+
+} // namespace
+
TransactionCoordinator::~TransactionCoordinator() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_state == TransactionCoordinator::CoordinatorState::kDone);
@@ -66,7 +94,12 @@ SharedSemiFuture<TransactionCoordinator::CommitDecision> TransactionCoordinator:
}
auto coordinator = shared_from_this();
- txn::async(_callbackPool, []() { txn::persistParticipantList(); })
+ txn::async(_callbackPool,
+ [coordinator, participantShards] {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ txn::persistParticipantList(
+ opCtx.get(), coordinator->_lsid, coordinator->_txnNumber, participantShards);
+ })
.then([coordinator, participantShards]() {
return txn::sendPrepare(coordinator,
coordinator->_networkExecutor,
@@ -75,22 +108,38 @@ SharedSemiFuture<TransactionCoordinator::CommitDecision> TransactionCoordinator:
coordinator->_lsid,
coordinator->_txnNumber);
})
- .then([coordinator](txn::PrepareVoteConsensus response) {
- return coordinator->_persistDecision(response);
+ .then([coordinator, participantShards](txn::PrepareVoteConsensus result) {
+ invariant(coordinator->_state == CoordinatorState::kPreparing);
+ return txn::async(coordinator->_callbackPool, [coordinator, result, participantShards] {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ const auto decision = makeDecisionFromPrepareVoteConsensus(
+ result, coordinator->_lsid, coordinator->_txnNumber);
+ txn::persistDecision(opCtx.get(),
+ coordinator->_lsid,
+ coordinator->_txnNumber,
+ participantShards,
+ decision.commitTimestamp);
+ return decision;
+ });
})
.then([coordinator, participantShards](txn::CoordinatorCommitDecision decision) {
- // Send the decision and then propagate it down the continuation chain.
return coordinator->_sendDecisionToParticipants(participantShards, decision)
.then([decision] { return decision.decision; });
})
.then([coordinator](CommitDecision finalDecision) {
stdx::unique_lock<stdx::mutex> lk(coordinator->_mutex);
- LOG(3) << "Two-phase commit completed successfully with decision " << finalDecision
- << " for session " << coordinator->_lsid.toBSON() << ", transaction number "
- << coordinator->_txnNumber;
+ LOG(3) << "Finished coordinating transaction " << coordinator->_txnNumber
+ << " on session " << coordinator->_lsid.toBSON() << " with decision "
+ << finalDecision;
coordinator->_transitionToDone(std::move(lk));
})
- .then([coordinator] { return coordinator->_deleteDecision(); })
+ .then([coordinator] {
+ return txn::async(coordinator->_callbackPool, [coordinator] {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ return txn::deleteCoordinatorDoc(
+ opCtx.get(), coordinator->_lsid, coordinator->_txnNumber);
+ });
+ })
.onError([coordinator](Status s) {
stdx::unique_lock<stdx::mutex> lk(coordinator->_mutex);
LOG(3) << "Two-phase commit failed with error in state " << coordinator->_state
@@ -128,33 +177,23 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() {
}
}
-Future<txn::CoordinatorCommitDecision> TransactionCoordinator::_persistDecision(
- const txn::PrepareVoteConsensus& prepareResponse) {
- invariant(_state == CoordinatorState::kPreparing);
- // TODO (SERVER-36853): Implement persistence of decision.
- // TODO (SERVER-36853): Handle errors appropriately.
- return txn::async(_callbackPool,
- [prepareResponse]() { return persistDecision(prepareResponse); });
-}
-
Future<void> TransactionCoordinator::_sendDecisionToParticipants(
- const std::vector<ShardId>& participantShards,
- txn::CoordinatorCommitDecision coordinatorDecision) {
+ const std::vector<ShardId>& participantShards, txn::CoordinatorCommitDecision decision) {
invariant(_state == CoordinatorState::kPreparing);
- _finalDecisionPromise.emplaceValue(coordinatorDecision.decision);
+ _finalDecisionPromise.emplaceValue(decision.decision);
// Send the decision to all participants.
- switch (coordinatorDecision.decision) {
+ switch (decision.decision) {
case CommitDecision::kCommit:
_state = CoordinatorState::kCommitting;
- invariant(coordinatorDecision.commitTimestamp);
+ invariant(decision.commitTimestamp);
return txn::sendCommit(_networkExecutor,
_callbackPool,
participantShards,
_lsid,
_txnNumber,
- coordinatorDecision.commitTimestamp.get());
+ decision.commitTimestamp.get());
case CommitDecision::kAbort:
_state = CoordinatorState::kAborting;
return txn::sendAbort(
@@ -163,12 +202,6 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants(
MONGO_UNREACHABLE;
};
-Future<void> TransactionCoordinator::_deleteDecision() {
- invariant(_state == CoordinatorState::kDone);
- // TODO (SERVER-36853): Implement deletion of decision.
- return Future<void>::makeReady();
-}
-
void TransactionCoordinator::_transitionToDone(stdx::unique_lock<stdx::mutex> lk) noexcept {
_state = CoordinatorState::kDone;
diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h
index 17dcf8f1715..7a96955d085 100644
--- a/src/mongo/db/transaction_coordinator.h
+++ b/src/mongo/db/transaction_coordinator.h
@@ -159,13 +159,6 @@ private:
// in continuations. Basically they just make the main code prettier.
/**
- * Rersists the commit decision and returns a response containing the decision and, if the
- * decision was to commit, the commit timestamp.
- */
- Future<txn::CoordinatorCommitDecision> _persistDecision(
- const txn::PrepareVoteConsensus& prepareResponse);
-
- /**
* Asynchronously sends the commit decision to all participants (commit or abort), resolving the
* returned future when all participants have acknowledged the decision.
*/
@@ -173,12 +166,6 @@ private:
txn::CoordinatorCommitDecision coordinatorDecision);
/**
- * Asynchronously deletes the commit decision from the config.transactionCommitDecisions
- * collection, resolving the returned future when the decision has been persisted.
- */
- Future<void> _deleteDecision();
-
- /**
* Notifies all callers of onCompletion that the commit process has completed by fulfilling
* their promises, and transitions the state to done.
*
diff --git a/src/mongo/db/transaction_commit_decision.idl b/src/mongo/db/transaction_coordinator_document.idl
index ba980e37b08..af6aaf36d7c 100644
--- a/src/mongo/db/transaction_commit_decision.idl
+++ b/src/mongo/db/transaction_coordinator_document.idl
@@ -37,16 +37,23 @@ imports:
- "mongo/s/sharding_types.idl"
structs:
- TransactionCommitDecision:
- description: "A document used for storing the coordinator commit decision."
+ TransactionCoordinatorDocument:
+ description: "A document used for majority confirming the coordinator's state changes"
strict: true
fields:
_id:
type: OperationSessionInfo
description: "The sessionId and txnNumber of this transaction."
- commitTimestamp:
- type: timestamp
- description: "The clusterTime at which the participants should make the transaction's writes visible."
+ cpp_name: id
participants:
type: array<shard_id>
- description: "List of transaction participants."
+ description: "The list of transaction participants."
+ decision:
+ type: string
+ description: "The coordinator's decision for the transaction, that is, 'commit' or 'abort'. Only set if the coordinator has made a decision."
+ optional: true
+ # TODO (SERVER-38324): Add validator that the string is 'abort' or 'commit'
+ commitTimestamp:
+ type: timestamp
+ description: "The clusterTime at which the participants should make the transaction's writes visible. Only set if the coordinator decided to commit."
+ optional: true
diff --git a/src/mongo/db/transaction_coordinator_test.cpp b/src/mongo/db/transaction_coordinator_test.cpp
index c1ba122e859..4c1efa1349b 100644
--- a/src/mongo/db/transaction_coordinator_test.cpp
+++ b/src/mongo/db/transaction_coordinator_test.cpp
@@ -689,25 +689,6 @@ TEST_F(TransactionCoordinatorTest,
static_cast<int>(TransactionCoordinator::CommitDecision::kAbort));
}
-TEST_F(TransactionCoordinatorTest, RunCommitAfterAbortVoteSubsequentPrepareRequestsDontRetry) {
- auto commitDecisionFuture = coordinator()->runCommit(kTwoShardIdList);
-
- // One participant votes abort after retry.
- assertPrepareSentAndRespondWithRetryableError();
- assertPrepareSentAndRespondWithNoSuchTransaction();
-
- // One participant cannot be reached
- assertPrepareSentAndRespondWithRetryableError();
- // Should not be a retry here since one participant already voted abort.
-
- assertAbortSentAndRespondWithSuccess();
- assertAbortSentAndRespondWithSuccess();
-
- auto commitDecision = commitDecisionFuture.get();
- ASSERT_EQ(static_cast<int>(commitDecision),
- static_cast<int>(TransactionCoordinator::CommitDecision::kAbort));
-}
-
TEST_F(TransactionCoordinatorTest,
RunCommitReturnsCorrectCommitDecisionOnCommitAfterNetworkRetries) {
auto commitDecisionFuture = coordinator()->runCommit(kTwoShardIdList);
diff --git a/src/mongo/db/transaction_coordinator_util.cpp b/src/mongo/db/transaction_coordinator_util.cpp
index bd50cd9bb86..53d0cbae0df 100644
--- a/src/mongo/db/transaction_coordinator_util.cpp
+++ b/src/mongo/db/transaction_coordinator_util.cpp
@@ -40,6 +40,10 @@
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/grid.h"
@@ -49,10 +53,19 @@
namespace mongo {
namespace {
+
+MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern);
+MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForDecisionWriteConcern);
+
using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
using ResponseStatus = executor::TaskExecutor::ResponseStatus;
using CommitDecision = TransactionCoordinator::CommitDecision;
+const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern(
+ WriteConcernOptions::kInternalMajorityNoSnapshot,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout);
+
/**
* Finds the host and port for a shard.
*/
@@ -163,6 +176,32 @@ bool isRetryableError(ErrorCodes::Error code) {
code == ErrorCodes::NetworkInterfaceExceededTimeLimit;
}
+BSONArray buildParticipantListMatchesConditions(const std::vector<ShardId>& participantList) {
+ BSONArrayBuilder barr;
+ for (const auto& participant : participantList) {
+ barr.append(participant.toString());
+ }
+
+ const long long participantListLength = participantList.size();
+ BSONObj participantListHasSize = BSON(TransactionCoordinatorDocument::kParticipantsFieldName
+ << BSON("$size" << participantListLength));
+
+ BSONObj participantListContains =
+ BSON(TransactionCoordinatorDocument::kParticipantsFieldName << BSON("$all" << barr.arr()));
+
+ return BSON_ARRAY(participantListContains << participantListHasSize);
+}
+
+std::string buildParticipantListString(const std::vector<ShardId>& participantList) {
+ StringBuilder ss;
+ ss << "[";
+ for (const auto& participant : participantList) {
+ ss << participant << " ";
+ }
+ ss << "]";
+ return ss.str();
+}
+
} // namespace
@@ -418,33 +457,270 @@ Future<void> sendAbort(executor::TaskExecutor* executor,
return whenAll(responses);
}
-void persistParticipantList() {
- // TODO (SERVER-36853): Implement this.
+void persistParticipantList(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participantList) {
+ LOG(0) << "Going to write participant list for lsid: " << lsid.toBSON()
+ << ", txnNumber: " << txnNumber;
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(lsid);
+ sessionInfo.setTxnNumber(txnNumber);
+
+ DBDirectClient client(opCtx);
+
+ // Throws if serializing the request or deserializing the response fails.
+ const auto commandResponse = client.runCommand([&] {
+ write_ops::Update updateOp(NamespaceString::kTransactionCoordinatorsNamespace);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+
+ // Ensure that the document for the (lsid, txnNumber) either has no participant list or
+ // has the same participant list. The document may have the same participant list if an
+ // earlier attempt to write the participant list failed waiting for writeConcern.
+ BSONObj noParticipantList = BSON(TransactionCoordinatorDocument::kParticipantsFieldName
+ << BSON("$exists" << false));
+ BSONObj sameParticipantList =
+ BSON("$and" << buildParticipantListMatchesConditions(participantList));
+ entry.setQ(BSON(TransactionCoordinatorDocument::kIdFieldName
+ << sessionInfo.toBSON()
+ << "$or"
+ << BSON_ARRAY(noParticipantList << sameParticipantList)));
+
+ // Update with participant list.
+ TransactionCoordinatorDocument doc;
+ doc.setId(std::move(sessionInfo));
+ doc.setParticipants(std::move(participantList));
+ entry.setU(doc.toBSON());
+
+ entry.setUpsert(true);
+ return entry;
+ }()});
+ return updateOp.serialize({});
+ }());
+
+ const auto upsertStatus = getStatusFromWriteCommandReply(commandResponse->getCommandReply());
+
+ // Convert a DuplicateKey error to an anonymous error.
+ if (upsertStatus.code() == ErrorCodes::DuplicateKey) {
+ // Attempt to include the document for this (lsid, txnNumber) in the error message, if one
+ // exists. Note that this is best-effort: the document may have been deleted or manually
+ // changed since the update above ran.
+ const auto doc = client.findOne(
+ NamespaceString::kTransactionCoordinatorsNamespace.toString(),
+ QUERY(TransactionCoordinatorDocument::kIdFieldName << sessionInfo.toBSON()));
+ uasserted(51025,
+ str::stream() << "While attempting to write participant list "
+ << buildParticipantListString(participantList)
+ << " for lsid "
+ << lsid.toBSON()
+ << " and txnNumber "
+ << txnNumber
+ << ", found document for the (lsid, txnNumber) with a different "
+ "participant list. Current document for the (lsid, txnNumber): "
+ << doc);
+ }
+
+ // Throw any other error.
+ uassertStatusOK(upsertStatus);
+
+ LOG(0) << "Wrote participant list for lsid: " << lsid.toBSON() << ", txnNumber: " << txnNumber;
+
+ if (MONGO_FAIL_POINT(hangBeforeWaitingForParticipantListWriteConcern)) {
+ LOG(0) << "Hit hangBeforeWaitingForParticipantListWriteConcern failpoint";
+ }
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
+ opCtx, hangBeforeWaitingForParticipantListWriteConcern);
+
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(
+ waitForWriteConcern(opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ kInternalMajorityNoSnapshotWriteConcern,
+ &unusedWCResult));
}
-CoordinatorCommitDecision persistDecision(PrepareVoteConsensus response) {
- invariant(response.decision);
- CoordinatorCommitDecision coordinatorDecision{response.decision.get(), boost::none};
- LOG(3) << "Coordinator shard persisting commit decision " << response.decision;
-
- if (response.decision == TransactionCoordinator::CommitDecision::kCommit) {
- invariant(response.maxPrepareTimestamp);
- coordinatorDecision.commitTimestamp = Timestamp(response.maxPrepareTimestamp->getSecs(),
- response.maxPrepareTimestamp->getInc() + 1);
- LOG(3) << "Coordinator shard adjusting cluster time to "
- << coordinatorDecision.commitTimestamp.get();
- Status s = LogicalClock::get(getGlobalServiceContext())
- ->advanceClusterTime(LogicalTime(response.maxPrepareTimestamp.get()));
- if (!s.isOK()) {
- log() << "Coordinator shard failed to advance cluster time to "
- "commitTimestamp "
- << causedBy(s);
- }
+void persistDecision(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participantList,
+ const boost::optional<Timestamp>& commitTimestamp) {
+ LOG(0) << "Going to write decision " << (commitTimestamp ? "commit" : "abort")
+ << " for lsid: " << lsid.toBSON() << ", txnNumber: " << txnNumber;
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(lsid);
+ sessionInfo.setTxnNumber(txnNumber);
+
+ DBDirectClient client(opCtx);
+
+ // Throws if serializing the request or deserializing the response fails.
+ const auto commandResponse = client.runCommand([&] {
+ write_ops::Update updateOp(NamespaceString::kTransactionCoordinatorsNamespace);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+
+ // Ensure that the document for the (lsid, txnNumber) has the same participant list and
+ // either has no decision or the same decision. The document may have the same decision
+ // if an earlier attempt to write the decision failed waiting for writeConcern.
+ BSONObj noDecision = BSON(TransactionCoordinatorDocument::kDecisionFieldName
+ << BSON("$exists" << false)
+ << "commitTimestamp"
+ << BSON("$exists" << false));
+ BSONObj sameDecision;
+ if (commitTimestamp) {
+ sameDecision = BSON(TransactionCoordinatorDocument::kDecisionFieldName
+ << "commit"
+ << TransactionCoordinatorDocument::kCommitTimestampFieldName
+ << *commitTimestamp);
+ } else {
+ sameDecision = BSON(TransactionCoordinatorDocument::kDecisionFieldName
+ << "abort"
+ << TransactionCoordinatorDocument::kCommitTimestampFieldName
+ << BSON("$exists" << false));
+ }
+ entry.setQ(BSON(TransactionCoordinatorDocument::kIdFieldName
+ << sessionInfo.toBSON()
+ << "$and"
+ << buildParticipantListMatchesConditions(participantList)
+ << "$or"
+ << BSON_ARRAY(noDecision << sameDecision)));
+
+ // Update with decision.
+ TransactionCoordinatorDocument doc;
+ doc.setId(sessionInfo);
+ doc.setParticipants(std::move(participantList));
+ if (commitTimestamp) {
+ doc.setDecision("commit"_sd);
+ doc.setCommitTimestamp(commitTimestamp);
+ } else {
+ doc.setDecision("abort"_sd);
+ }
+ entry.setU(doc.toBSON());
+
+ return entry;
+ }()});
+ return updateOp.serialize({});
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ // If no document matched, throw an anonymous error. (The update itself will not have thrown an
+ // error, because it's legal for an update to match no documents.)
+ if (commandReply.getIntField("n") != 1) {
+ // Attempt to include the document for this (lsid, txnNumber) in the error message, if one
+ // exists. Note that this is best-effort: the document may have been deleted or manually
+ // changed since the update above ran.
+ const auto doc = client.findOne(
+ NamespaceString::kTransactionCoordinatorsNamespace.toString(),
+ QUERY(TransactionCoordinatorDocument::kIdFieldName << sessionInfo.toBSON()));
+ uasserted(51026,
+ str::stream() << "While attempting to write decision "
+ << (commitTimestamp ? "'commit'" : "'abort'")
+ << " for lsid "
+ << lsid.toBSON()
+ << " and txnNumber "
+ << txnNumber
+ << ", either failed to find document for this (lsid, txnNumber) or "
+ "document existed with a different participant list, different "
+ "decision, or different commitTimestamp. Current document for "
+ "the (lsid, txnNumber): "
+ << doc);
+ }
+
+ LOG(0) << "Wrote decision " << (commitTimestamp ? "commit" : "abort")
+ << " for lsid: " << lsid.toBSON() << ", txnNumber: " << txnNumber;
+
+ if (MONGO_FAIL_POINT(hangBeforeWaitingForDecisionWriteConcern)) {
+ LOG(0) << "Hit hangBeforeWaitingForDecisionWriteConcern failpoint";
}
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx,
+ hangBeforeWaitingForDecisionWriteConcern);
+
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(
+ waitForWriteConcern(opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ kInternalMajorityNoSnapshotWriteConcern,
+ &unusedWCResult));
+}
+
+void deleteCoordinatorDoc(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) {
+ LOG(0) << "Going to delete coordinator doc for lsid: " << lsid.toBSON()
+ << ", txnNumber: " << txnNumber;
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(lsid);
+ sessionInfo.setTxnNumber(txnNumber);
+
+ DBDirectClient client(opCtx);
+
+ // Throws if serializing the request or deserializing the response fails.
+ auto commandResponse = client.runCommand([&] {
+ write_ops::Delete deleteOp(NamespaceString::kTransactionCoordinatorsNamespace);
+ deleteOp.setDeletes({[&] {
+ write_ops::DeleteOpEntry entry;
+
+ // Ensure the document is only deleted after a decision has been made.
+ BSONObj abortDecision =
+ BSON(TransactionCoordinatorDocument::kDecisionFieldName << "abort");
+ BSONObj commitDecision =
+ BSON(TransactionCoordinatorDocument::kDecisionFieldName << "commit");
+ entry.setQ(BSON(TransactionCoordinatorDocument::kIdFieldName
+ << sessionInfo.toBSON()
+ << TransactionCoordinatorDocument::kDecisionFieldName
+ << BSON("$exists" << true)));
+
+ entry.setMulti(false);
+ return entry;
+ }()});
+ return deleteOp.serialize({});
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ // If no document matched, throw an anonymous error. (The delete itself will not have thrown an
+ // error, because it's legal for a delete to match no documents.)
+ if (commandReply.getIntField("n") != 1) {
+ // Attempt to include the document for this (lsid, txnNumber) in the error message, if one
+ // exists. Note that this is best-effort: the document may have been deleted or manually
+ // changed since the update above ran.
+ const auto doc = client.findOne(
+ NamespaceString::kTransactionCoordinatorsNamespace.toString(),
+ QUERY(TransactionCoordinatorDocument::kIdFieldName << sessionInfo.toBSON()));
+ uasserted(51027,
+ str::stream() << "While attempting to delete document for lsid " << lsid.toBSON()
+ << " and txnNumber "
+ << txnNumber
+ << ", either failed to find document for this (lsid, txnNumber) or "
+ "document existed without a decision. Current document for the "
+ "(lsid, txnNumber): "
+ << doc);
+ }
+
+ LOG(0) << "Deleted coordinator doc for lsid: " << lsid.toBSON() << ", txnNumber: " << txnNumber;
+}
+
+std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx) {
+ std::vector<TransactionCoordinatorDocument> allCoordinatorDocs;
- // TODO (SERVER-36853): Actually persist decision.
+ Query query;
+ DBDirectClient client(opCtx);
+ auto coordinatorDocsCursor =
+ client.query(NamespaceString::kTransactionCoordinatorsNamespace, query);
+
+ while (coordinatorDocsCursor->more()) {
+ // TODO (SERVER-38307): Try/catch around parsing the document and skip the document if it
+ // fails to parse.
+ auto nextDecision = TransactionCoordinatorDocument::parse(IDLParserErrorContext(""),
+ coordinatorDocsCursor->next());
+ allCoordinatorDocs.push_back(nextDecision);
+ }
- return coordinatorDecision;
+ return allCoordinatorDocs;
}
Future<void> whenAll(std::vector<Future<void>>& futures) {
diff --git a/src/mongo/db/transaction_coordinator_util.h b/src/mongo/db/transaction_coordinator_util.h
index fd6475139d1..43ad178a1c5 100644
--- a/src/mongo/db/transaction_coordinator_util.h
+++ b/src/mongo/db/transaction_coordinator_util.h
@@ -31,7 +31,9 @@
#pragma once
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/transaction_coordinator.h"
+#include "mongo/db/transaction_coordinator_document_gen.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/shard_id.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -124,6 +126,7 @@ Future<void> sendCommit(executor::TaskExecutor* executor,
const LogicalSessionId& lsid,
const TxnNumber& txnNumber,
Timestamp commitTimestamp);
+
/**
* Sends abort to all shards and returns a future that will be resolved when all participants have
* responded with success.
@@ -134,20 +137,73 @@ Future<void> sendAbort(executor::TaskExecutor* executor,
const LogicalSessionId& lsid,
const TxnNumber& txnNumber);
+/**
+ * Upserts a document of the form:
+ *
+ * {
+ * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
+ * participants: ["shard0000", "shard0001"]
+ * }
+ *
+ * into config.transaction_coordinators and waits for the upsert to be majority-committed.
+ *
+ * Throws if the upsert fails or waiting for writeConcern fails.
+ * If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it means
+ * a document for the (lsid, txnNumber) exists with a different participant list.
+ */
+void persistParticipantList(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participantList);
/**
- * Persists the participant list to the config.transactionCommitDecisions collection.
+ * If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators for
+ * (lsid, txnNumber) to be:
+ *
+ * {
+ * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
+ * participants: ["shard0000", "shard0001"]
+ * decision: "abort"
+ * }
*
- * TODO (SERVER-36853): Implement this. It is currently a stub.
+ * else updates the document to be:
+ *
+ * {
+ * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
+ * participants: ["shard0000", "shard0001"]
+ * decision: "commit"
+ * commitTimestamp: Timestamp(xxxxxxxx, x),
+ * }
+ *
+ * and waits for the update to be majority-committed.
+ *
+ * Throws if the update fails or waiting for writeConcern fails.
+ * If the update succeeds but did not update any document, throws an anonymous error, because it
+ * means either no document for (lsid, txnNumber) exists, or a document exists but has a different
+ * participant list, different decision, or different commit Timestamp.
*/
-void persistParticipantList();
+void persistDecision(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participantList,
+ const boost::optional<Timestamp>& commitTimestamp);
/**
- * Persists the commit decision to the config.transactionCommitDecisions collection.
+ * Deletes the document in config.transaction_coordinators for (lsid, txnNumber).
*
- * TODO (SERVER-36853): Implement this. It is currently a stub.
+ * Does *not* wait for the delete to be majority-committed.
+ *
+ * Throws if the update fails.
+ * If the update succeeds but did not update any document, throws an anonymous error, because it
+ * means either no document for (lsid, txnNumber) exists, or a document exists but without a
+ * decision.
+ */
+void deleteCoordinatorDoc(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber);
+
+/**
+ * Reads and returns all documents in config.transaction_coordinators.
*/
-CoordinatorCommitDecision persistDecision(PrepareVoteConsensus response);
+std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx);
//
// BELOW THIS ARE FUTURES-RELATED UTILITIES.
diff --git a/src/mongo/db/transaction_coordinator_util_test.cpp b/src/mongo/db/transaction_coordinator_util_test.cpp
new file mode 100644
index 00000000000..e5ff56011eb
--- /dev/null
+++ b/src/mongo/db/transaction_coordinator_util_test.cpp
@@ -0,0 +1,349 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/transaction_coordinator_util.h"
+#include "mongo/s/shard_server_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+const std::string kAbortDecision{"abort"};
+const std::string kCommitDecision{"commit"};
+
+class TransactionCoordinatorCollectionTest : public ShardServerTestFixture {
+protected:
+ void setUp() override {
+ ShardServerTestFixture::setUp();
+ _lsid = makeLogicalSessionIdForTest();
+ _commitTimestamp = Timestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0);
+ }
+
+ LogicalSessionId lsid() const {
+ return _lsid;
+ }
+
+ TxnNumber txnNumber() const {
+ return _txnNumber;
+ }
+
+ const std::vector<ShardId>& participants() const {
+ return _participants;
+ }
+
+ const Timestamp& commitTimestamp() const {
+ return _commitTimestamp;
+ }
+
+ void assertDocumentMatches(TransactionCoordinatorDocument doc,
+ LogicalSessionId expectedLsid,
+ TxnNumber expectedTxnNum,
+ std::vector<ShardId> expectedParticipants,
+ boost::optional<std::string> expectedDecision = boost::none,
+ boost::optional<Timestamp> expectedCommitTimestamp = boost::none) {
+ ASSERT_EQUALS(doc.getId().getSessionId(), expectedLsid);
+ ASSERT_EQUALS(doc.getId().getTxnNumber(), expectedTxnNum);
+
+ ASSERT(doc.getParticipants() == expectedParticipants);
+
+ if (expectedDecision) {
+ ASSERT_EQUALS(expectedDecision, doc.getDecision()->toString());
+ } else {
+ ASSERT(!doc.getDecision());
+ }
+
+ if (expectedCommitTimestamp) {
+ ASSERT_EQUALS(expectedCommitTimestamp, doc.getCommitTimestamp());
+ } else {
+ ASSERT(!doc.getCommitTimestamp());
+ }
+ }
+
+ void persistParticipantListExpectSuccess(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participants) {
+ txn::persistParticipantList(opCtx, lsid, txnNumber, participants);
+
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
+ assertDocumentMatches(allCoordinatorDocs[0], lsid, txnNumber, participants);
+ }
+
+ void persistParticipantListExpectDuplicateKeyError(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participants) {
+ ASSERT_THROWS_CODE(txn::persistParticipantList(opCtx, lsid, txnNumber, participants),
+ AssertionException,
+ 51025);
+ }
+
+ void persistDecisionExpectSuccess(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participants,
+ const boost::optional<Timestamp>& commitTimestamp) {
+ txn::persistDecision(opCtx, lsid, txnNumber, participants, commitTimestamp);
+
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
+ if (commitTimestamp) {
+ assertDocumentMatches(allCoordinatorDocs[0],
+ lsid,
+ txnNumber,
+ participants,
+ kCommitDecision,
+ *commitTimestamp);
+ } else {
+ assertDocumentMatches(
+ allCoordinatorDocs[0], lsid, txnNumber, participants, kAbortDecision);
+ }
+ }
+
+ void persistDecisionExpectNoMatchingDocuments(
+ OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participants,
+ const boost::optional<Timestamp>& commitTimestamp) {
+ ASSERT_THROWS_CODE(
+ txn::persistDecision(opCtx, lsid, txnNumber, participants, commitTimestamp),
+ AssertionException,
+ 51026);
+ }
+
+ void deleteCoordinatorDocExpectSuccess(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ txn::deleteCoordinatorDoc(opCtx, lsid, txnNumber);
+
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(0));
+ }
+
+ void deleteCoordinatorDocExpectNoMatchingDocuments(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ ASSERT_THROWS_CODE(
+ txn::deleteCoordinatorDoc(opCtx, lsid, txnNumber), AssertionException, 51027);
+ }
+
+ LogicalSessionId _lsid;
+ const TxnNumber _txnNumber{0};
+ const std::vector<ShardId> _participants{
+ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")};
+ Timestamp _commitTimestamp;
+};
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistParticipantListWhenNoDocumentForTransactionExistsSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistParticipantListWhenMatchingDocumentForTransactionExistsSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistParticipantListWhenDocumentWithConflictingParticipantListExistsFails) {
+ std::vector<ShardId> participants{
+ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")};
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants);
+
+ std::vector<ShardId> smallerParticipantList{ShardId("shard0001"), ShardId("shard0002")};
+ persistParticipantListExpectDuplicateKeyError(
+ operationContext(), lsid(), txnNumber(), smallerParticipantList);
+
+ std::vector<ShardId> largerParticipantList{
+ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003"), ShardId("shard0004")};
+ persistParticipantListExpectDuplicateKeyError(
+ operationContext(), lsid(), txnNumber(), largerParticipantList);
+
+ std::vector<ShardId> differentSameSizeParticipantList{
+ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0004")};
+ persistParticipantListExpectDuplicateKeyError(
+ operationContext(), lsid(), txnNumber(), differentSameSizeParticipantList);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistParticipantListForMultipleTransactionsOnSameSession) {
+ for (int i = 1; i <= 3; i++) {
+ auto txnNumber = TxnNumber{i};
+
+ txn::persistParticipantList(operationContext(), lsid(), txnNumber, participants());
+
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i));
+ }
+}
+
+TEST_F(TransactionCoordinatorCollectionTest, PersistParticipantListForMultipleSessions) {
+ for (int i = 1; i <= 3; i++) {
+ auto lsid = makeLogicalSessionIdForTest();
+ txn::persistParticipantList(operationContext(), lsid, txnNumber(), participants());
+
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i));
+ }
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistAbortDecisionWhenNoDocumentForTransactionExistsFails) {
+ persistDecisionExpectNoMatchingDocuments(
+ operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistAbortDecisionWhenDocumentExistsWithoutDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistAbortDecisionWhenDocumentExistsWithSameDecisionSucceeds) {
+
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */);
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistCommitDecisionWhenNoDocumentForTransactionExistsFails) {
+ persistDecisionExpectNoMatchingDocuments(
+ operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistCommitDecisionWhenDocumentExistsWithoutDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistCommitDecisionWhenDocumentExistsWithSameDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */);
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistCommitDecisionWhenDocumentExistsWithDifferentCommitTimestampFails) {
+ auto differentCommitTimestamp = Timestamp(Date_t::now().toMillisSinceEpoch() / 1000, 1);
+
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */);
+ persistDecisionExpectNoMatchingDocuments(operationContext(),
+ lsid(),
+ txnNumber(),
+ participants(),
+ differentCommitTimestamp /* commit */);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistAbortDecisionWhenDocumentExistsWithDifferentDecisionFails) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */);
+ persistDecisionExpectNoMatchingDocuments(
+ operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ PersistCommitDecisionWhenDocumentExistsWithDifferentDecisionFails) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */);
+ persistDecisionExpectNoMatchingDocuments(
+ operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */);
+}
+
+TEST_F(TransactionCoordinatorCollectionTest, DeleteCoordinatorDocWhenNoDocumentExistsFails) {
+ deleteCoordinatorDocExpectNoMatchingDocuments(operationContext(), lsid(), txnNumber());
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ DeleteCoordinatorDocWhenDocumentExistsWithoutDecisionFails) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ deleteCoordinatorDocExpectNoMatchingDocuments(operationContext(), lsid(), txnNumber());
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ DeleteCoordinatorDocWhenDocumentExistsWithAbortDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */);
+ deleteCoordinatorDocExpectSuccess(operationContext(), lsid(), txnNumber());
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ DeleteCoordinatorDocWhenDocumentExistsWithCommitDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants());
+ persistDecisionExpectSuccess(
+ operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */);
+ deleteCoordinatorDocExpectSuccess(operationContext(), lsid(), txnNumber());
+}
+
+TEST_F(TransactionCoordinatorCollectionTest,
+ MultipleCommitDecisionsPersistedAndDeleteOneSuccessfullyRemovesCorrectDecision) {
+ const auto txnNumber1 = TxnNumber{4};
+ const auto txnNumber2 = TxnNumber{5};
+
+ // Insert coordinator documents for two transactions.
+ txn::persistParticipantList(operationContext(), lsid(), txnNumber1, participants());
+ txn::persistParticipantList(operationContext(), lsid(), txnNumber2, participants());
+
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(2));
+
+ // Delete the document for the first transaction and check that only the second transaction's
+ // document still exists.
+ txn::persistDecision(
+ operationContext(), lsid(), txnNumber1, participants(), boost::none /* abort */);
+ txn::deleteCoordinatorDoc(operationContext(), lsid(), txnNumber1);
+
+ allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
+ assertDocumentMatches(allCoordinatorDocs[0], lsid(), txnNumber2, participants());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/write_concern_options.cpp b/src/mongo/db/write_concern_options.cpp
index e9bf4f4ba68..4f92515f26b 100644
--- a/src/mongo/db/write_concern_options.cpp
+++ b/src/mongo/db/write_concern_options.cpp
@@ -67,6 +67,9 @@ const int WriteConcernOptions::kNoWaiting(-1);
const StringData WriteConcernOptions::kWriteConcernField = "writeConcern"_sd;
const char WriteConcernOptions::kMajority[] = "majority";
+
+// TODO (PM-1301): Remove once the stable Timestamp is allowed to advance past the oldest prepare
+// Timestamp.
const char WriteConcernOptions::kInternalMajorityNoSnapshot[] = "internalMajorityNoSnapshot";
const BSONObj WriteConcernOptions::Default = BSONObj();