From 2e6e75cc78113485c6c82e695bccd91f7c3932e1 Mon Sep 17 00:00:00 2001 From: Esha Maharishi Date: Wed, 5 Dec 2018 16:04:25 -0500 Subject: SERVER-37884 Coordinator should make its state durable before sending prepare and before sending the decision --- src/mongo/db/SConscript | 4 +- src/mongo/db/namespace_string.cpp | 7 + src/mongo/db/namespace_string.h | 3 + src/mongo/db/transaction_commit_decision.idl | 52 --- src/mongo/db/transaction_coordinator.cpp | 91 ++++-- src/mongo/db/transaction_coordinator.h | 13 - src/mongo/db/transaction_coordinator_document.idl | 59 ++++ src/mongo/db/transaction_coordinator_test.cpp | 19 -- src/mongo/db/transaction_coordinator_util.cpp | 320 +++++++++++++++++-- src/mongo/db/transaction_coordinator_util.h | 68 +++- src/mongo/db/transaction_coordinator_util_test.cpp | 349 +++++++++++++++++++++ src/mongo/db/write_concern_options.cpp | 3 + 12 files changed, 846 insertions(+), 142 deletions(-) delete mode 100644 src/mongo/db/transaction_commit_decision.idl create mode 100644 src/mongo/db/transaction_coordinator_document.idl create mode 100644 src/mongo/db/transaction_coordinator_util_test.cpp (limited to 'src/mongo/db') diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f3869401905..2bc852aa9a0 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -648,10 +648,12 @@ env.Library( 'transaction_coordinator_factory_mongod.cpp', 'transaction_coordinator_service.cpp', 'transaction_coordinator_util.cpp', + env.Idlc('transaction_coordinator_document.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/base', 'catalog_raii', + 'rw_concern_d', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/commands/txn_cmd_request', @@ -689,7 +691,6 @@ env.Library( 'transaction_participant.cpp', 's/recover_transaction_decision_from_local_participant.cpp', env.Idlc('session_txn_record.idl')[0], - env.Idlc('transaction_commit_decision.idl')[0], env.Idlc('transactions_stats.idl')[0], ], LIBDEPS=[ @@ -727,6 +728,7 @@ env.CppUnitTest( 'transaction_coordinator_catalog_test.cpp', 'transaction_coordinator_service_test.cpp', 'transaction_coordinator_test.cpp', + 'transaction_coordinator_util_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 85b28afd6e8..684bb917798 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -58,8 +58,15 @@ const NamespaceString NamespaceString::kServerConfigurationNamespace(NamespaceSt "system.version"); const NamespaceString NamespaceString::kLogicalSessionsNamespace(NamespaceString::kConfigDb, "system.sessions"); + +// Persisted state for a shard participating in a transaction or retryable write. const NamespaceString NamespaceString::kSessionTransactionsTableNamespace( NamespaceString::kConfigDb, "transactions"); + +// Persisted state for a shard coordinating a cross-shard transaction. +const NamespaceString NamespaceString::kTransactionCoordinatorsNamespace( + NamespaceString::kConfigDb, "transaction_coordinators"); + const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(NamespaceString::kConfigDb, "cache.collections"); const NamespaceString NamespaceString::kShardConfigDatabasesNamespace(NamespaceString::kConfigDb, diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index c56898a53b8..651d769544b 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -93,6 +93,9 @@ public: // Namespace of the the oplog collection. static const NamespaceString kRsOplogNamespace; + // Namespace for storing the persisted state of transaction coordinators. + static const NamespaceString kTransactionCoordinatorsNamespace; + /** * Constructs an empty NamespaceString. */ diff --git a/src/mongo/db/transaction_commit_decision.idl b/src/mongo/db/transaction_commit_decision.idl deleted file mode 100644 index ba980e37b08..00000000000 --- a/src/mongo/db/transaction_commit_decision.idl +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (C) 2018-present MongoDB, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the Server Side Public License, version 1, -# as published by MongoDB, Inc. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Server Side Public License for more details. -# -# You should have received a copy of the Server Side Public License -# along with this program. If not, see -# . -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. -# - -global: - cpp_namespace: "mongo" - cpp_includes: - - "mongo/db/logical_session_id.h" - -imports: - - "mongo/idl/basic_types.idl" - - "mongo/db/logical_session_id.idl" - - "mongo/s/sharding_types.idl" - -structs: - TransactionCommitDecision: - description: "A document used for storing the coordinator commit decision." - strict: true - fields: - _id: - type: OperationSessionInfo - description: "The sessionId and txnNumber of this transaction." - commitTimestamp: - type: timestamp - description: "The clusterTime at which the participants should make the transaction's writes visible." - participants: - type: array - description: "List of transaction participants." diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp index 459a8cc7497..5735554ebbc 100644 --- a/src/mongo/db/transaction_coordinator.cpp +++ b/src/mongo/db/transaction_coordinator.cpp @@ -34,11 +34,39 @@ #include "mongo/db/transaction_coordinator.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/transaction_coordinator_util.h" #include "mongo/util/log.h" namespace mongo { +namespace { + +txn::CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus( + const txn::PrepareVoteConsensus& result, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber) { + invariant(result.decision); + txn::CoordinatorCommitDecision decision{result.decision.get(), boost::none}; + + if (result.decision == TransactionCoordinator::CommitDecision::kCommit) { + invariant(result.maxPrepareTimestamp); + + decision.commitTimestamp = Timestamp(result.maxPrepareTimestamp->getSecs(), + result.maxPrepareTimestamp->getInc() + 1); + + LOG(3) << "Advancing cluster time to commit Timestamp " << decision.commitTimestamp.get() + << " of transaction " << txnNumber << " on session " << lsid.toBSON(); + + uassertStatusOK(LogicalClock::get(getGlobalServiceContext()) + ->advanceClusterTime(LogicalTime(result.maxPrepareTimestamp.get()))); + } + + return decision; +} + +} // namespace + TransactionCoordinator::~TransactionCoordinator() { stdx::lock_guard lk(_mutex); invariant(_state == TransactionCoordinator::CoordinatorState::kDone); @@ -66,7 +94,12 @@ SharedSemiFuture TransactionCoordinator: } auto coordinator = shared_from_this(); - txn::async(_callbackPool, []() { txn::persistParticipantList(); }) + txn::async(_callbackPool, + [coordinator, participantShards] { + auto opCtx = Client::getCurrent()->makeOperationContext(); + txn::persistParticipantList( + opCtx.get(), coordinator->_lsid, coordinator->_txnNumber, participantShards); + }) .then([coordinator, participantShards]() { return txn::sendPrepare(coordinator, coordinator->_networkExecutor, @@ -75,22 +108,38 @@ SharedSemiFuture TransactionCoordinator: coordinator->_lsid, coordinator->_txnNumber); }) - .then([coordinator](txn::PrepareVoteConsensus response) { - return coordinator->_persistDecision(response); + .then([coordinator, participantShards](txn::PrepareVoteConsensus result) { + invariant(coordinator->_state == CoordinatorState::kPreparing); + return txn::async(coordinator->_callbackPool, [coordinator, result, participantShards] { + auto opCtx = Client::getCurrent()->makeOperationContext(); + const auto decision = makeDecisionFromPrepareVoteConsensus( + result, coordinator->_lsid, coordinator->_txnNumber); + txn::persistDecision(opCtx.get(), + coordinator->_lsid, + coordinator->_txnNumber, + participantShards, + decision.commitTimestamp); + return decision; + }); }) .then([coordinator, participantShards](txn::CoordinatorCommitDecision decision) { - // Send the decision and then propagate it down the continuation chain. return coordinator->_sendDecisionToParticipants(participantShards, decision) .then([decision] { return decision.decision; }); }) .then([coordinator](CommitDecision finalDecision) { stdx::unique_lock lk(coordinator->_mutex); - LOG(3) << "Two-phase commit completed successfully with decision " << finalDecision - << " for session " << coordinator->_lsid.toBSON() << ", transaction number " - << coordinator->_txnNumber; + LOG(3) << "Finished coordinating transaction " << coordinator->_txnNumber + << " on session " << coordinator->_lsid.toBSON() << " with decision " + << finalDecision; coordinator->_transitionToDone(std::move(lk)); }) - .then([coordinator] { return coordinator->_deleteDecision(); }) + .then([coordinator] { + return txn::async(coordinator->_callbackPool, [coordinator] { + auto opCtx = Client::getCurrent()->makeOperationContext(); + return txn::deleteCoordinatorDoc( + opCtx.get(), coordinator->_lsid, coordinator->_txnNumber); + }); + }) .onError([coordinator](Status s) { stdx::unique_lock lk(coordinator->_mutex); LOG(3) << "Two-phase commit failed with error in state " << coordinator->_state @@ -128,33 +177,23 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() { } } -Future TransactionCoordinator::_persistDecision( - const txn::PrepareVoteConsensus& prepareResponse) { - invariant(_state == CoordinatorState::kPreparing); - // TODO (SERVER-36853): Implement persistence of decision. - // TODO (SERVER-36853): Handle errors appropriately. - return txn::async(_callbackPool, - [prepareResponse]() { return persistDecision(prepareResponse); }); -} - Future TransactionCoordinator::_sendDecisionToParticipants( - const std::vector& participantShards, - txn::CoordinatorCommitDecision coordinatorDecision) { + const std::vector& participantShards, txn::CoordinatorCommitDecision decision) { invariant(_state == CoordinatorState::kPreparing); - _finalDecisionPromise.emplaceValue(coordinatorDecision.decision); + _finalDecisionPromise.emplaceValue(decision.decision); // Send the decision to all participants. - switch (coordinatorDecision.decision) { + switch (decision.decision) { case CommitDecision::kCommit: _state = CoordinatorState::kCommitting; - invariant(coordinatorDecision.commitTimestamp); + invariant(decision.commitTimestamp); return txn::sendCommit(_networkExecutor, _callbackPool, participantShards, _lsid, _txnNumber, - coordinatorDecision.commitTimestamp.get()); + decision.commitTimestamp.get()); case CommitDecision::kAbort: _state = CoordinatorState::kAborting; return txn::sendAbort( @@ -163,12 +202,6 @@ Future TransactionCoordinator::_sendDecisionToParticipants( MONGO_UNREACHABLE; }; -Future TransactionCoordinator::_deleteDecision() { - invariant(_state == CoordinatorState::kDone); - // TODO (SERVER-36853): Implement deletion of decision. - return Future::makeReady(); -} - void TransactionCoordinator::_transitionToDone(stdx::unique_lock lk) noexcept { _state = CoordinatorState::kDone; diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h index 17dcf8f1715..7a96955d085 100644 --- a/src/mongo/db/transaction_coordinator.h +++ b/src/mongo/db/transaction_coordinator.h @@ -158,13 +158,6 @@ private: // Below are several helper functions that return functions, which will be used as the callbacks // in continuations. Basically they just make the main code prettier. - /** - * Rersists the commit decision and returns a response containing the decision and, if the - * decision was to commit, the commit timestamp. - */ - Future _persistDecision( - const txn::PrepareVoteConsensus& prepareResponse); - /** * Asynchronously sends the commit decision to all participants (commit or abort), resolving the * returned future when all participants have acknowledged the decision. @@ -172,12 +165,6 @@ private: Future _sendDecisionToParticipants(const std::vector& participantShards, txn::CoordinatorCommitDecision coordinatorDecision); - /** - * Asynchronously deletes the commit decision from the config.transactionCommitDecisions - * collection, resolving the returned future when the decision has been persisted. - */ - Future _deleteDecision(); - /** * Notifies all callers of onCompletion that the commit process has completed by fulfilling * their promises, and transitions the state to done. diff --git a/src/mongo/db/transaction_coordinator_document.idl b/src/mongo/db/transaction_coordinator_document.idl new file mode 100644 index 00000000000..af6aaf36d7c --- /dev/null +++ b/src/mongo/db/transaction_coordinator_document.idl @@ -0,0 +1,59 @@ +# Copyright (C) 2018-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# . +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + cpp_includes: + - "mongo/db/logical_session_id.h" + +imports: + - "mongo/idl/basic_types.idl" + - "mongo/db/logical_session_id.idl" + - "mongo/s/sharding_types.idl" + +structs: + TransactionCoordinatorDocument: + description: "A document used for majority confirming the coordinator's state changes" + strict: true + fields: + _id: + type: OperationSessionInfo + description: "The sessionId and txnNumber of this transaction." + cpp_name: id + participants: + type: array + description: "The list of transaction participants." + decision: + type: string + description: "The coordinator's decision for the transaction, that is, 'commit' or 'abort'. Only set if the coordinator has made a decision." + optional: true + # TODO (SERVER-38324): Add validator that the string is 'abort' or 'commit' + commitTimestamp: + type: timestamp + description: "The clusterTime at which the participants should make the transaction's writes visible. Only set if the coordinator decided to commit." + optional: true diff --git a/src/mongo/db/transaction_coordinator_test.cpp b/src/mongo/db/transaction_coordinator_test.cpp index c1ba122e859..4c1efa1349b 100644 --- a/src/mongo/db/transaction_coordinator_test.cpp +++ b/src/mongo/db/transaction_coordinator_test.cpp @@ -689,25 +689,6 @@ TEST_F(TransactionCoordinatorTest, static_cast(TransactionCoordinator::CommitDecision::kAbort)); } -TEST_F(TransactionCoordinatorTest, RunCommitAfterAbortVoteSubsequentPrepareRequestsDontRetry) { - auto commitDecisionFuture = coordinator()->runCommit(kTwoShardIdList); - - // One participant votes abort after retry. - assertPrepareSentAndRespondWithRetryableError(); - assertPrepareSentAndRespondWithNoSuchTransaction(); - - // One participant cannot be reached - assertPrepareSentAndRespondWithRetryableError(); - // Should not be a retry here since one participant already voted abort. - - assertAbortSentAndRespondWithSuccess(); - assertAbortSentAndRespondWithSuccess(); - - auto commitDecision = commitDecisionFuture.get(); - ASSERT_EQ(static_cast(commitDecision), - static_cast(TransactionCoordinator::CommitDecision::kAbort)); -} - TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommitAfterNetworkRetries) { auto commitDecisionFuture = coordinator()->runCommit(kTwoShardIdList); diff --git a/src/mongo/db/transaction_coordinator_util.cpp b/src/mongo/db/transaction_coordinator_util.cpp index bd50cd9bb86..53d0cbae0df 100644 --- a/src/mongo/db/transaction_coordinator_util.cpp +++ b/src/mongo/db/transaction_coordinator_util.cpp @@ -40,6 +40,10 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/grid.h" @@ -49,10 +53,19 @@ namespace mongo { namespace { + +MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern); +MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForDecisionWriteConcern); + using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using ResponseStatus = executor::TaskExecutor::ResponseStatus; using CommitDecision = TransactionCoordinator::CommitDecision; +const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern( + WriteConcernOptions::kInternalMajorityNoSnapshot, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout); + /** * Finds the host and port for a shard. */ @@ -163,6 +176,32 @@ bool isRetryableError(ErrorCodes::Error code) { code == ErrorCodes::NetworkInterfaceExceededTimeLimit; } +BSONArray buildParticipantListMatchesConditions(const std::vector& participantList) { + BSONArrayBuilder barr; + for (const auto& participant : participantList) { + barr.append(participant.toString()); + } + + const long long participantListLength = participantList.size(); + BSONObj participantListHasSize = BSON(TransactionCoordinatorDocument::kParticipantsFieldName + << BSON("$size" << participantListLength)); + + BSONObj participantListContains = + BSON(TransactionCoordinatorDocument::kParticipantsFieldName << BSON("$all" << barr.arr())); + + return BSON_ARRAY(participantListContains << participantListHasSize); +} + +std::string buildParticipantListString(const std::vector& participantList) { + StringBuilder ss; + ss << "["; + for (const auto& participant : participantList) { + ss << participant << " "; + } + ss << "]"; + return ss.str(); +} + } // namespace @@ -418,33 +457,270 @@ Future sendAbort(executor::TaskExecutor* executor, return whenAll(responses); } -void persistParticipantList() { - // TODO (SERVER-36853): Implement this. +void persistParticipantList(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::vector& participantList) { + LOG(0) << "Going to write participant list for lsid: " << lsid.toBSON() + << ", txnNumber: " << txnNumber; + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(lsid); + sessionInfo.setTxnNumber(txnNumber); + + DBDirectClient client(opCtx); + + // Throws if serializing the request or deserializing the response fails. + const auto commandResponse = client.runCommand([&] { + write_ops::Update updateOp(NamespaceString::kTransactionCoordinatorsNamespace); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + + // Ensure that the document for the (lsid, txnNumber) either has no participant list or + // has the same participant list. The document may have the same participant list if an + // earlier attempt to write the participant list failed waiting for writeConcern. + BSONObj noParticipantList = BSON(TransactionCoordinatorDocument::kParticipantsFieldName + << BSON("$exists" << false)); + BSONObj sameParticipantList = + BSON("$and" << buildParticipantListMatchesConditions(participantList)); + entry.setQ(BSON(TransactionCoordinatorDocument::kIdFieldName + << sessionInfo.toBSON() + << "$or" + << BSON_ARRAY(noParticipantList << sameParticipantList))); + + // Update with participant list. + TransactionCoordinatorDocument doc; + doc.setId(std::move(sessionInfo)); + doc.setParticipants(std::move(participantList)); + entry.setU(doc.toBSON()); + + entry.setUpsert(true); + return entry; + }()}); + return updateOp.serialize({}); + }()); + + const auto upsertStatus = getStatusFromWriteCommandReply(commandResponse->getCommandReply()); + + // Convert a DuplicateKey error to an anonymous error. + if (upsertStatus.code() == ErrorCodes::DuplicateKey) { + // Attempt to include the document for this (lsid, txnNumber) in the error message, if one + // exists. Note that this is best-effort: the document may have been deleted or manually + // changed since the update above ran. + const auto doc = client.findOne( + NamespaceString::kTransactionCoordinatorsNamespace.toString(), + QUERY(TransactionCoordinatorDocument::kIdFieldName << sessionInfo.toBSON())); + uasserted(51025, + str::stream() << "While attempting to write participant list " + << buildParticipantListString(participantList) + << " for lsid " + << lsid.toBSON() + << " and txnNumber " + << txnNumber + << ", found document for the (lsid, txnNumber) with a different " + "participant list. Current document for the (lsid, txnNumber): " + << doc); + } + + // Throw any other error. + uassertStatusOK(upsertStatus); + + LOG(0) << "Wrote participant list for lsid: " << lsid.toBSON() << ", txnNumber: " << txnNumber; + + if (MONGO_FAIL_POINT(hangBeforeWaitingForParticipantListWriteConcern)) { + LOG(0) << "Hit hangBeforeWaitingForParticipantListWriteConcern failpoint"; + } + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED( + opCtx, hangBeforeWaitingForParticipantListWriteConcern); + + WriteConcernResult unusedWCResult; + uassertStatusOK( + waitForWriteConcern(opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + kInternalMajorityNoSnapshotWriteConcern, + &unusedWCResult)); } -CoordinatorCommitDecision persistDecision(PrepareVoteConsensus response) { - invariant(response.decision); - CoordinatorCommitDecision coordinatorDecision{response.decision.get(), boost::none}; - LOG(3) << "Coordinator shard persisting commit decision " << response.decision; - - if (response.decision == TransactionCoordinator::CommitDecision::kCommit) { - invariant(response.maxPrepareTimestamp); - coordinatorDecision.commitTimestamp = Timestamp(response.maxPrepareTimestamp->getSecs(), - response.maxPrepareTimestamp->getInc() + 1); - LOG(3) << "Coordinator shard adjusting cluster time to " - << coordinatorDecision.commitTimestamp.get(); - Status s = LogicalClock::get(getGlobalServiceContext()) - ->advanceClusterTime(LogicalTime(response.maxPrepareTimestamp.get())); - if (!s.isOK()) { - log() << "Coordinator shard failed to advance cluster time to " - "commitTimestamp " - << causedBy(s); - } +void persistDecision(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::vector& participantList, + const boost::optional& commitTimestamp) { + LOG(0) << "Going to write decision " << (commitTimestamp ? "commit" : "abort") + << " for lsid: " << lsid.toBSON() << ", txnNumber: " << txnNumber; + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(lsid); + sessionInfo.setTxnNumber(txnNumber); + + DBDirectClient client(opCtx); + + // Throws if serializing the request or deserializing the response fails. + const auto commandResponse = client.runCommand([&] { + write_ops::Update updateOp(NamespaceString::kTransactionCoordinatorsNamespace); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + + // Ensure that the document for the (lsid, txnNumber) has the same participant list and + // either has no decision or the same decision. The document may have the same decision + // if an earlier attempt to write the decision failed waiting for writeConcern. + BSONObj noDecision = BSON(TransactionCoordinatorDocument::kDecisionFieldName + << BSON("$exists" << false) + << "commitTimestamp" + << BSON("$exists" << false)); + BSONObj sameDecision; + if (commitTimestamp) { + sameDecision = BSON(TransactionCoordinatorDocument::kDecisionFieldName + << "commit" + << TransactionCoordinatorDocument::kCommitTimestampFieldName + << *commitTimestamp); + } else { + sameDecision = BSON(TransactionCoordinatorDocument::kDecisionFieldName + << "abort" + << TransactionCoordinatorDocument::kCommitTimestampFieldName + << BSON("$exists" << false)); + } + entry.setQ(BSON(TransactionCoordinatorDocument::kIdFieldName + << sessionInfo.toBSON() + << "$and" + << buildParticipantListMatchesConditions(participantList) + << "$or" + << BSON_ARRAY(noDecision << sameDecision))); + + // Update with decision. + TransactionCoordinatorDocument doc; + doc.setId(sessionInfo); + doc.setParticipants(std::move(participantList)); + if (commitTimestamp) { + doc.setDecision("commit"_sd); + doc.setCommitTimestamp(commitTimestamp); + } else { + doc.setDecision("abort"_sd); + } + entry.setU(doc.toBSON()); + + return entry; + }()}); + return updateOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + // If no document matched, throw an anonymous error. (The update itself will not have thrown an + // error, because it's legal for an update to match no documents.) + if (commandReply.getIntField("n") != 1) { + // Attempt to include the document for this (lsid, txnNumber) in the error message, if one + // exists. Note that this is best-effort: the document may have been deleted or manually + // changed since the update above ran. + const auto doc = client.findOne( + NamespaceString::kTransactionCoordinatorsNamespace.toString(), + QUERY(TransactionCoordinatorDocument::kIdFieldName << sessionInfo.toBSON())); + uasserted(51026, + str::stream() << "While attempting to write decision " + << (commitTimestamp ? "'commit'" : "'abort'") + << " for lsid " + << lsid.toBSON() + << " and txnNumber " + << txnNumber + << ", either failed to find document for this (lsid, txnNumber) or " + "document existed with a different participant list, different " + "decision, or different commitTimestamp. Current document for " + "the (lsid, txnNumber): " + << doc); + } + + LOG(0) << "Wrote decision " << (commitTimestamp ? "commit" : "abort") + << " for lsid: " << lsid.toBSON() << ", txnNumber: " << txnNumber; + + if (MONGO_FAIL_POINT(hangBeforeWaitingForDecisionWriteConcern)) { + LOG(0) << "Hit hangBeforeWaitingForDecisionWriteConcern failpoint"; } + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, + hangBeforeWaitingForDecisionWriteConcern); + + WriteConcernResult unusedWCResult; + uassertStatusOK( + waitForWriteConcern(opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + kInternalMajorityNoSnapshotWriteConcern, + &unusedWCResult)); +} + +void deleteCoordinatorDoc(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) { + LOG(0) << "Going to delete coordinator doc for lsid: " << lsid.toBSON() + << ", txnNumber: " << txnNumber; + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(lsid); + sessionInfo.setTxnNumber(txnNumber); + + DBDirectClient client(opCtx); + + // Throws if serializing the request or deserializing the response fails. + auto commandResponse = client.runCommand([&] { + write_ops::Delete deleteOp(NamespaceString::kTransactionCoordinatorsNamespace); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + + // Ensure the document is only deleted after a decision has been made. + BSONObj abortDecision = + BSON(TransactionCoordinatorDocument::kDecisionFieldName << "abort"); + BSONObj commitDecision = + BSON(TransactionCoordinatorDocument::kDecisionFieldName << "commit"); + entry.setQ(BSON(TransactionCoordinatorDocument::kIdFieldName + << sessionInfo.toBSON() + << TransactionCoordinatorDocument::kDecisionFieldName + << BSON("$exists" << true))); + + entry.setMulti(false); + return entry; + }()}); + return deleteOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + // If no document matched, throw an anonymous error. (The delete itself will not have thrown an + // error, because it's legal for a delete to match no documents.) + if (commandReply.getIntField("n") != 1) { + // Attempt to include the document for this (lsid, txnNumber) in the error message, if one + // exists. Note that this is best-effort: the document may have been deleted or manually + // changed since the update above ran. + const auto doc = client.findOne( + NamespaceString::kTransactionCoordinatorsNamespace.toString(), + QUERY(TransactionCoordinatorDocument::kIdFieldName << sessionInfo.toBSON())); + uasserted(51027, + str::stream() << "While attempting to delete document for lsid " << lsid.toBSON() + << " and txnNumber " + << txnNumber + << ", either failed to find document for this (lsid, txnNumber) or " + "document existed without a decision. Current document for the " + "(lsid, txnNumber): " + << doc); + } + + LOG(0) << "Deleted coordinator doc for lsid: " << lsid.toBSON() << ", txnNumber: " << txnNumber; +} + +std::vector readAllCoordinatorDocs(OperationContext* opCtx) { + std::vector allCoordinatorDocs; - // TODO (SERVER-36853): Actually persist decision. + Query query; + DBDirectClient client(opCtx); + auto coordinatorDocsCursor = + client.query(NamespaceString::kTransactionCoordinatorsNamespace, query); + + while (coordinatorDocsCursor->more()) { + // TODO (SERVER-38307): Try/catch around parsing the document and skip the document if it + // fails to parse. + auto nextDecision = TransactionCoordinatorDocument::parse(IDLParserErrorContext(""), + coordinatorDocsCursor->next()); + allCoordinatorDocs.push_back(nextDecision); + } - return coordinatorDecision; + return allCoordinatorDocs; } Future whenAll(std::vector>& futures) { diff --git a/src/mongo/db/transaction_coordinator_util.h b/src/mongo/db/transaction_coordinator_util.h index fd6475139d1..43ad178a1c5 100644 --- a/src/mongo/db/transaction_coordinator_util.h +++ b/src/mongo/db/transaction_coordinator_util.h @@ -31,7 +31,9 @@ #pragma once #include "mongo/db/logical_session_id.h" +#include "mongo/db/operation_context.h" #include "mongo/db/transaction_coordinator.h" +#include "mongo/db/transaction_coordinator_document_gen.h" #include "mongo/executor/task_executor.h" #include "mongo/s/shard_id.h" #include "mongo/util/concurrency/thread_pool.h" @@ -124,6 +126,7 @@ Future sendCommit(executor::TaskExecutor* executor, const LogicalSessionId& lsid, const TxnNumber& txnNumber, Timestamp commitTimestamp); + /** * Sends abort to all shards and returns a future that will be resolved when all participants have * responded with success. @@ -134,20 +137,73 @@ Future sendAbort(executor::TaskExecutor* executor, const LogicalSessionId& lsid, const TxnNumber& txnNumber); +/** + * Upserts a document of the form: + * + * { + * _id: {lsid: , txnNumber: } + * participants: ["shard0000", "shard0001"] + * } + * + * into config.transaction_coordinators and waits for the upsert to be majority-committed. + * + * Throws if the upsert fails or waiting for writeConcern fails. + * If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it means + * a document for the (lsid, txnNumber) exists with a different participant list. + */ +void persistParticipantList(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::vector& participantList); /** - * Persists the participant list to the config.transactionCommitDecisions collection. + * If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators for + * (lsid, txnNumber) to be: + * + * { + * _id: {lsid: , txnNumber: } + * participants: ["shard0000", "shard0001"] + * decision: "abort" + * } * - * TODO (SERVER-36853): Implement this. It is currently a stub. + * else updates the document to be: + * + * { + * _id: {lsid: , txnNumber: } + * participants: ["shard0000", "shard0001"] + * decision: "commit" + * commitTimestamp: Timestamp(xxxxxxxx, x), + * } + * + * and waits for the update to be majority-committed. + * + * Throws if the update fails or waiting for writeConcern fails. + * If the update succeeds but did not update any document, throws an anonymous error, because it + * means either no document for (lsid, txnNumber) exists, or a document exists but has a different + * participant list, different decision, or different commit Timestamp. */ -void persistParticipantList(); +void persistDecision(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::vector& participantList, + const boost::optional& commitTimestamp); /** - * Persists the commit decision to the config.transactionCommitDecisions collection. + * Deletes the document in config.transaction_coordinators for (lsid, txnNumber). * - * TODO (SERVER-36853): Implement this. It is currently a stub. + * Does *not* wait for the delete to be majority-committed. + * + * Throws if the update fails. + * If the update succeeds but did not update any document, throws an anonymous error, because it + * means either no document for (lsid, txnNumber) exists, or a document exists but without a + * decision. + */ +void deleteCoordinatorDoc(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber); + +/** + * Reads and returns all documents in config.transaction_coordinators. */ -CoordinatorCommitDecision persistDecision(PrepareVoteConsensus response); +std::vector readAllCoordinatorDocs(OperationContext* opCtx); // // BELOW THIS ARE FUTURES-RELATED UTILITIES. diff --git a/src/mongo/db/transaction_coordinator_util_test.cpp b/src/mongo/db/transaction_coordinator_util_test.cpp new file mode 100644 index 00000000000..e5ff56011eb --- /dev/null +++ b/src/mongo/db/transaction_coordinator_util_test.cpp @@ -0,0 +1,349 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/transaction_coordinator_util.h" +#include "mongo/s/shard_server_test_fixture.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +const std::string kAbortDecision{"abort"}; +const std::string kCommitDecision{"commit"}; + +class TransactionCoordinatorCollectionTest : public ShardServerTestFixture { +protected: + void setUp() override { + ShardServerTestFixture::setUp(); + _lsid = makeLogicalSessionIdForTest(); + _commitTimestamp = Timestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0); + } + + LogicalSessionId lsid() const { + return _lsid; + } + + TxnNumber txnNumber() const { + return _txnNumber; + } + + const std::vector& participants() const { + return _participants; + } + + const Timestamp& commitTimestamp() const { + return _commitTimestamp; + } + + void assertDocumentMatches(TransactionCoordinatorDocument doc, + LogicalSessionId expectedLsid, + TxnNumber expectedTxnNum, + std::vector expectedParticipants, + boost::optional expectedDecision = boost::none, + boost::optional expectedCommitTimestamp = boost::none) { + ASSERT_EQUALS(doc.getId().getSessionId(), expectedLsid); + ASSERT_EQUALS(doc.getId().getTxnNumber(), expectedTxnNum); + + ASSERT(doc.getParticipants() == expectedParticipants); + + if (expectedDecision) { + ASSERT_EQUALS(expectedDecision, doc.getDecision()->toString()); + } else { + ASSERT(!doc.getDecision()); + } + + if (expectedCommitTimestamp) { + ASSERT_EQUALS(expectedCommitTimestamp, doc.getCommitTimestamp()); + } else { + ASSERT(!doc.getCommitTimestamp()); + } + } + + void persistParticipantListExpectSuccess(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::vector& participants) { + txn::persistParticipantList(opCtx, lsid, txnNumber, participants); + + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); + assertDocumentMatches(allCoordinatorDocs[0], lsid, txnNumber, participants); + } + + void persistParticipantListExpectDuplicateKeyError(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::vector& participants) { + ASSERT_THROWS_CODE(txn::persistParticipantList(opCtx, lsid, txnNumber, participants), + AssertionException, + 51025); + } + + void persistDecisionExpectSuccess(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::vector& participants, + const boost::optional& commitTimestamp) { + txn::persistDecision(opCtx, lsid, txnNumber, participants, commitTimestamp); + + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); + if (commitTimestamp) { + assertDocumentMatches(allCoordinatorDocs[0], + lsid, + txnNumber, + participants, + kCommitDecision, + *commitTimestamp); + } else { + assertDocumentMatches( + allCoordinatorDocs[0], lsid, txnNumber, participants, kAbortDecision); + } + } + + void persistDecisionExpectNoMatchingDocuments( + OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::vector& participants, + const boost::optional& commitTimestamp) { + ASSERT_THROWS_CODE( + txn::persistDecision(opCtx, lsid, txnNumber, participants, commitTimestamp), + AssertionException, + 51026); + } + + void deleteCoordinatorDocExpectSuccess(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber) { + txn::deleteCoordinatorDoc(opCtx, lsid, txnNumber); + + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(0)); + } + + void deleteCoordinatorDocExpectNoMatchingDocuments(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber) { + ASSERT_THROWS_CODE( + txn::deleteCoordinatorDoc(opCtx, lsid, txnNumber), AssertionException, 51027); + } + + LogicalSessionId _lsid; + const TxnNumber _txnNumber{0}; + const std::vector _participants{ + ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")}; + Timestamp _commitTimestamp; +}; + +TEST_F(TransactionCoordinatorCollectionTest, + PersistParticipantListWhenNoDocumentForTransactionExistsSucceeds) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistParticipantListWhenMatchingDocumentForTransactionExistsSucceeds) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistParticipantListWhenDocumentWithConflictingParticipantListExistsFails) { + std::vector participants{ + ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")}; + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants); + + std::vector smallerParticipantList{ShardId("shard0001"), ShardId("shard0002")}; + persistParticipantListExpectDuplicateKeyError( + operationContext(), lsid(), txnNumber(), smallerParticipantList); + + std::vector largerParticipantList{ + ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003"), ShardId("shard0004")}; + persistParticipantListExpectDuplicateKeyError( + operationContext(), lsid(), txnNumber(), largerParticipantList); + + std::vector differentSameSizeParticipantList{ + ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0004")}; + persistParticipantListExpectDuplicateKeyError( + operationContext(), lsid(), txnNumber(), differentSameSizeParticipantList); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistParticipantListForMultipleTransactionsOnSameSession) { + for (int i = 1; i <= 3; i++) { + auto txnNumber = TxnNumber{i}; + + txn::persistParticipantList(operationContext(), lsid(), txnNumber, participants()); + + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i)); + } +} + +TEST_F(TransactionCoordinatorCollectionTest, PersistParticipantListForMultipleSessions) { + for (int i = 1; i <= 3; i++) { + auto lsid = makeLogicalSessionIdForTest(); + txn::persistParticipantList(operationContext(), lsid, txnNumber(), participants()); + + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i)); + } +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistAbortDecisionWhenNoDocumentForTransactionExistsFails) { + persistDecisionExpectNoMatchingDocuments( + operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistAbortDecisionWhenDocumentExistsWithoutDecisionSucceeds) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistAbortDecisionWhenDocumentExistsWithSameDecisionSucceeds) { + + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistCommitDecisionWhenNoDocumentForTransactionExistsFails) { + persistDecisionExpectNoMatchingDocuments( + operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistCommitDecisionWhenDocumentExistsWithoutDecisionSucceeds) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistCommitDecisionWhenDocumentExistsWithSameDecisionSucceeds) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistCommitDecisionWhenDocumentExistsWithDifferentCommitTimestampFails) { + auto differentCommitTimestamp = Timestamp(Date_t::now().toMillisSinceEpoch() / 1000, 1); + + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */); + persistDecisionExpectNoMatchingDocuments(operationContext(), + lsid(), + txnNumber(), + participants(), + differentCommitTimestamp /* commit */); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistAbortDecisionWhenDocumentExistsWithDifferentDecisionFails) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */); + persistDecisionExpectNoMatchingDocuments( + operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */); +} + +TEST_F(TransactionCoordinatorCollectionTest, + PersistCommitDecisionWhenDocumentExistsWithDifferentDecisionFails) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */); + persistDecisionExpectNoMatchingDocuments( + operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */); +} + +TEST_F(TransactionCoordinatorCollectionTest, DeleteCoordinatorDocWhenNoDocumentExistsFails) { + deleteCoordinatorDocExpectNoMatchingDocuments(operationContext(), lsid(), txnNumber()); +} + +TEST_F(TransactionCoordinatorCollectionTest, + DeleteCoordinatorDocWhenDocumentExistsWithoutDecisionFails) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + deleteCoordinatorDocExpectNoMatchingDocuments(operationContext(), lsid(), txnNumber()); +} + +TEST_F(TransactionCoordinatorCollectionTest, + DeleteCoordinatorDocWhenDocumentExistsWithAbortDecisionSucceeds) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), boost::none /* abort */); + deleteCoordinatorDocExpectSuccess(operationContext(), lsid(), txnNumber()); +} + +TEST_F(TransactionCoordinatorCollectionTest, + DeleteCoordinatorDocWhenDocumentExistsWithCommitDecisionSucceeds) { + persistParticipantListExpectSuccess(operationContext(), lsid(), txnNumber(), participants()); + persistDecisionExpectSuccess( + operationContext(), lsid(), txnNumber(), participants(), commitTimestamp() /* commit */); + deleteCoordinatorDocExpectSuccess(operationContext(), lsid(), txnNumber()); +} + +TEST_F(TransactionCoordinatorCollectionTest, + MultipleCommitDecisionsPersistedAndDeleteOneSuccessfullyRemovesCorrectDecision) { + const auto txnNumber1 = TxnNumber{4}; + const auto txnNumber2 = TxnNumber{5}; + + // Insert coordinator documents for two transactions. + txn::persistParticipantList(operationContext(), lsid(), txnNumber1, participants()); + txn::persistParticipantList(operationContext(), lsid(), txnNumber2, participants()); + + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(2)); + + // Delete the document for the first transaction and check that only the second transaction's + // document still exists. + txn::persistDecision( + operationContext(), lsid(), txnNumber1, participants(), boost::none /* abort */); + txn::deleteCoordinatorDoc(operationContext(), lsid(), txnNumber1); + + allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); + assertDocumentMatches(allCoordinatorDocs[0], lsid(), txnNumber2, participants()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/write_concern_options.cpp b/src/mongo/db/write_concern_options.cpp index e9bf4f4ba68..4f92515f26b 100644 --- a/src/mongo/db/write_concern_options.cpp +++ b/src/mongo/db/write_concern_options.cpp @@ -67,6 +67,9 @@ const int WriteConcernOptions::kNoWaiting(-1); const StringData WriteConcernOptions::kWriteConcernField = "writeConcern"_sd; const char WriteConcernOptions::kMajority[] = "majority"; + +// TODO (PM-1301): Remove once the stable Timestamp is allowed to advance past the oldest prepare +// Timestamp. const char WriteConcernOptions::kInternalMajorityNoSnapshot[] = "internalMajorityNoSnapshot"; const BSONObj WriteConcernOptions::Default = BSONObj(); -- cgit v1.2.1