summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-03-27 10:21:08 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-04-01 07:26:26 -0400
commit9dd07662bbe80415d90131fca2be6312166d6f39 (patch)
tree2c3225830737ac56796901d6e928830be62ede31 /src/mongo
parentd7f0bffc93993bcc7e794344d5a1ae25ff95571b (diff)
downloadmongo-9dd07662bbe80415d90131fca2be6312166d6f39.tar.gz
SERVER-40297 Move the TransactionCoordinator data structures to a separate file
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp125
-rw-r--r--src/mongo/db/s/transaction_coordinator.h45
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog.cpp3
-rw-r--r--src/mongo/db/s/transaction_coordinator_document.idl30
-rw-r--r--src/mongo/db/s/transaction_coordinator_driver.cpp34
-rw-r--r--src/mongo/db/s/transaction_coordinator_driver.h74
-rw-r--r--src/mongo/db/s/transaction_coordinator_structures.cpp81
-rw-r--r--src/mongo/db/s/transaction_coordinator_structures.h62
-rw-r--r--src/mongo/db/s/transaction_coordinator_structures_test.cpp50
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp11
-rw-r--r--src/mongo/db/s/transaction_coordinator_test_fixture.h2
12 files changed, 307 insertions, 212 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 53cb831d5b7..d8edd672d09 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -126,6 +126,7 @@ env.Library(
'transaction_coordinator_factory_mongod.cpp',
'transaction_coordinator_futures_util.cpp',
'transaction_coordinator_service.cpp',
+ 'transaction_coordinator_structures.cpp',
'transaction_coordinator.cpp',
env.Idlc('transaction_coordinator_document.idl')[0],
],
@@ -413,6 +414,7 @@ env.CppUnitTest(
'transaction_coordinator_catalog_test.cpp',
'transaction_coordinator_futures_util_test.cpp',
'transaction_coordinator_service_test.cpp',
+ 'transaction_coordinator_structures_test.cpp',
'transaction_coordinator_test_fixture.cpp',
'transaction_coordinator_test.cpp',
],
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 5ba843566a6..7689ddb2108 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -36,33 +36,35 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/s/transaction_coordinator_document_gen.h"
#include "mongo/db/s/transaction_coordinator_futures_util.h"
-#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
-using CoordinatorCommitDecision = TransactionCoordinator::CoordinatorCommitDecision;
+using CommitDecision = txn::CommitDecision;
+using CoordinatorCommitDecision = txn::CoordinatorCommitDecision;
+using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
-CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(
- ServiceContext* service,
- const txn::PrepareVoteConsensus& result,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber) {
+using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus;
+
+CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(ServiceContext* service,
+ const PrepareVoteConsensus& result,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) {
invariant(result.decision);
- CoordinatorCommitDecision decision{*result.decision, boost::none};
+ CoordinatorCommitDecision decision(*result.decision);
- if (result.decision == txn::CommitDecision::kCommit) {
+ if (result.decision == CommitDecision::kCommit) {
invariant(result.maxPrepareTimestamp);
- decision.commitTimestamp = Timestamp(result.maxPrepareTimestamp->getSecs(),
- result.maxPrepareTimestamp->getInc() + 1);
+ decision.setCommitTimestamp(Timestamp(result.maxPrepareTimestamp->getSecs(),
+ result.maxPrepareTimestamp->getInc() + 1));
- LOG(3) << "Advancing cluster time to commit Timestamp " << decision.commitTimestamp.get()
- << " of transaction " << txnNumber << " on session " << lsid.getId();
+ LOG(3) << "Advancing cluster time to the commit timestamp "
+ << *decision.getCommitTimestamp() << " for " << lsid.getId() << ':' << txnNumber;
uassertStatusOK(LogicalClock::get(service)->advanceClusterTime(
- LogicalTime(result.maxPrepareTimestamp.get())));
+ LogicalTime(*result.maxPrepareTimestamp)));
}
return decision;
@@ -138,13 +140,11 @@ void TransactionCoordinator::continueCommit(const TransactionCoordinatorDocument
// Helper lambda to get the decision either from the document passed in or from the participants
// (by performing 'phase one' of two-phase commit).
auto getDecision = [&]() -> Future<CoordinatorCommitDecision> {
- auto decision = doc.getDecision();
+ const auto& decision = doc.getDecision();
if (!decision) {
return _runPhaseOne(participantShards);
} else {
- return (decision->decision == txn::CommitDecision::kCommit)
- ? CoordinatorCommitDecision{txn::CommitDecision::kCommit, decision->commitTimestamp}
- : CoordinatorCommitDecision{txn::CommitDecision::kAbort, boost::none};
+ return *decision;
}
};
@@ -169,7 +169,7 @@ Future<void> TransactionCoordinator::onCompletion() {
[](const Status& s) { uasserted(ErrorCodes::InterruptedDueToStepDown, s.reason()); });
}
-SharedSemiFuture<txn::CommitDecision> TransactionCoordinator::getDecision() {
+SharedSemiFuture<CommitDecision> TransactionCoordinator::getDecision() {
stdx::lock_guard<stdx::mutex> lg(_mutex);
return _decisionPromise.getFuture();
}
@@ -180,7 +180,7 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_state == CoordinatorState::kInit) {
invariant(!_decisionPromise.getFuture().isReady());
- _decisionPromise.emplaceValue(txn::CommitDecision::kCanceled);
+ _decisionPromise.emplaceValue(CommitDecision::kCanceled);
_transitionToDone(std::move(lk));
}
}
@@ -195,14 +195,15 @@ void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() {
Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne(
const std::vector<ShardId>& participantShards) {
return _driver.sendPrepare(participantShards, _lsid, _txnNumber)
- .then([this, participantShards](txn::PrepareVoteConsensus result) {
+ .then([this, participantShards](PrepareVoteConsensus result) {
invariant(_state == CoordinatorState::kPreparing);
auto decision =
makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber);
return _driver
- .persistDecision(_lsid, _txnNumber, participantShards, decision.commitTimestamp)
+ .persistDecision(
+ _lsid, _txnNumber, participantShards, decision.getCommitTimestamp())
.then([decision] { return decision; });
});
}
@@ -211,11 +212,8 @@ Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& pa
const CoordinatorCommitDecision& decision) {
return _sendDecisionToParticipants(participantShards, decision)
.then([this] {
- if (getGlobalFailPointRegistry()
- ->getFailPoint("doNotForgetCoordinator")
- ->shouldFail()) {
+ if (MONGO_FAIL_POINT(doNotForgetCoordinator))
return Future<void>::makeReady();
- }
return _driver.deleteCoordinatorDoc(_lsid, _txnNumber);
})
@@ -230,17 +228,17 @@ Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& pa
Future<void> TransactionCoordinator::_sendDecisionToParticipants(
const std::vector<ShardId>& participantShards, CoordinatorCommitDecision decision) {
invariant(_state == CoordinatorState::kPreparing);
- _decisionPromise.emplaceValue(decision.decision);
+ _decisionPromise.emplaceValue(decision.getDecision());
- switch (decision.decision) {
- case txn::CommitDecision::kCommit:
+ switch (decision.getDecision()) {
+ case CommitDecision::kCommit:
_state = CoordinatorState::kCommitting;
return _driver.sendCommit(
- participantShards, _lsid, _txnNumber, *decision.commitTimestamp);
- case txn::CommitDecision::kAbort:
+ participantShards, _lsid, _txnNumber, *decision.getCommitTimestamp());
+ case CommitDecision::kAbort:
_state = CoordinatorState::kAborting;
return _driver.sendAbort(participantShards, _lsid, _txnNumber);
- case txn::CommitDecision::kCanceled:
+ case CommitDecision::kCanceled:
MONGO_UNREACHABLE;
};
MONGO_UNREACHABLE;
@@ -292,57 +290,6 @@ void TransactionCoordinator::_transitionToDone(stdx::unique_lock<stdx::mutex> lk
}
}
-StatusWith<CoordinatorCommitDecision> CoordinatorCommitDecision::fromBSON(const BSONObj& doc) {
- CoordinatorCommitDecision decision;
-
- for (const auto& e : doc) {
- const auto fieldName = e.fieldNameStringData();
-
- if (fieldName == "decision") {
- if (e.type() != String) {
- return Status(ErrorCodes::TypeMismatch, "decision must be a string");
- }
-
- if (e.str() == "commit") {
- decision.decision = txn::CommitDecision::kCommit;
- } else if (e.str() == "abort") {
- decision.decision = txn::CommitDecision::kAbort;
- } else {
- return Status(ErrorCodes::BadValue, "decision must be either 'abort' or 'commit'");
- }
- } else if (fieldName == "commitTimestamp") {
- if (e.type() != bsonTimestamp && e.type() != Date) {
- return Status(ErrorCodes::TypeMismatch, "commit timestamp must be a timestamp");
- }
- decision.commitTimestamp = {e.timestamp()};
- }
- }
-
- if (decision.decision == txn::CommitDecision::kAbort && decision.commitTimestamp) {
- return Status(ErrorCodes::BadValue, "abort decision cannot have a timestamp");
- }
- if (decision.decision == txn::CommitDecision::kCommit && !decision.commitTimestamp) {
- return Status(ErrorCodes::BadValue, "commit decision must have a timestamp");
- }
-
- return decision;
-}
-
-BSONObj CoordinatorCommitDecision::toBSON() const {
- BSONObjBuilder builder;
-
- if (decision == txn::CommitDecision::kCommit) {
- builder.append("decision", "commit");
- } else {
- builder.append("decision", "abort");
- }
- if (commitTimestamp) {
- builder.append("commitTimestamp", *commitTimestamp);
- }
-
- return builder.obj();
-}
-
logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
const TransactionCoordinator::CoordinatorState& state) {
using State = TransactionCoordinator::CoordinatorState;
@@ -358,16 +305,4 @@ logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
return stream;
}
-logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
- const txn::CommitDecision& decision) {
- // clang-format off
- switch (decision) {
- case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break;
- case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break;
- case txn::CommitDecision::kCanceled: stream.stream() << "kCanceled"; break;
- };
- // clang-format on
- return stream;
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h
index a6bd7642d10..1e3ec5d820b 100644
--- a/src/mongo/db/s/transaction_coordinator.h
+++ b/src/mongo/db/s/transaction_coordinator.h
@@ -33,6 +33,7 @@
#include "mongo/db/s/transaction_coordinator_driver.h"
#include "mongo/logger/logstream_builder.h"
+#include "mongo/util/fail_point_service.h"
namespace mongo {
@@ -66,34 +67,6 @@ public:
~TransactionCoordinator();
/**
- * Represents a decision made by the coordinator, including commit timestamp to be sent with
- * commitTransaction in the case of a decision to commit.
- */
- struct CoordinatorCommitDecision {
- txn::CommitDecision decision;
- boost::optional<Timestamp> commitTimestamp;
-
- /**
- * Parses a CoordinatorCommitDecision from the object.
- */
- static StatusWith<CoordinatorCommitDecision> fromBSON(const BSONObj& obj);
-
- /**
- * Returns an instance of CoordinatorCommitDecision from an object.
- *
- * Throws if the object cannot be deserialized.
- */
- static CoordinatorCommitDecision fromBSONThrowing(const BSONObj& obj) {
- return uassertStatusOK(fromBSON(obj));
- };
-
- /**
- * Returns the BSON representation of this object.
- */
- BSONObj toBSON() const;
- };
-
- /**
* The state of the coordinator.
*/
enum class CoordinatorState {
@@ -120,7 +93,7 @@ public:
/**
* To be used to continue coordinating a transaction on step up.
*/
- void continueCommit(const TransactionCoordinatorDocument& doc);
+ void continueCommit(const txn::TransactionCoordinatorDocument& doc);
/**
* Gets a Future that will contain the decision that the coordinator reaches. Note that this
@@ -162,7 +135,8 @@ private:
* 3. If the decision is to commit, calculates the commit Timestamp.
* 4. Writes the decision and waits for the decision to become majority-committed.
*/
- Future<CoordinatorCommitDecision> _runPhaseOne(const std::vector<ShardId>& participantShards);
+ Future<txn::CoordinatorCommitDecision> _runPhaseOne(
+ const std::vector<ShardId>& participantShards);
/**
* Expects the decision to already be majority-committed.
@@ -173,14 +147,14 @@ private:
* majority-committed.
*/
Future<void> _runPhaseTwo(const std::vector<ShardId>& participantShards,
- const CoordinatorCommitDecision& decision);
+ const txn::CoordinatorCommitDecision& decision);
/**
* Asynchronously sends the commit decision to all participants (commit or abort), resolving the
* returned future when all participants have acknowledged the decision.
*/
Future<void> _sendDecisionToParticipants(const std::vector<ShardId>& participantShards,
- CoordinatorCommitDecision coordinatorDecision);
+ txn::CoordinatorCommitDecision coordinatorDecision);
/**
* Helper for handling errors that occur during either phase of commit coordination.
@@ -236,10 +210,11 @@ private:
std::vector<Promise<void>> _completionPromises;
};
-logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
- const TransactionCoordinator::CoordinatorState& state);
+// TODO (SERVER-37886): Remove this failpoint once failover can be tested on coordinators that have
+// a local participant
+MONGO_FAIL_POINT_DECLARE(doNotForgetCoordinator);
logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
- const txn::CommitDecision& decision);
+ const TransactionCoordinator::CoordinatorState& state);
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp
index 8d5dc379d0e..069fe02fb47 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp
@@ -33,13 +33,10 @@
#include "mongo/db/s/transaction_coordinator_catalog.h"
-#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
-// TODO (SERVER-37886): Remove this failpoint once failover can be tested on coordinators that have
-// a local participant.
MONGO_FAIL_POINT_DEFINE(doNotForgetCoordinator);
TransactionCoordinatorCatalog::TransactionCoordinatorCatalog() = default;
diff --git a/src/mongo/db/s/transaction_coordinator_document.idl b/src/mongo/db/s/transaction_coordinator_document.idl
index c7885075073..78dadac283b 100644
--- a/src/mongo/db/s/transaction_coordinator_document.idl
+++ b/src/mongo/db/s/transaction_coordinator_document.idl
@@ -27,10 +27,9 @@
#
global:
- cpp_namespace: "mongo"
+ cpp_namespace: "mongo::txn"
cpp_includes:
- - "mongo/db/logical_session_id.h"
- - "mongo/db/s/transaction_coordinator.h"
+ - "mongo/db/s/transaction_coordinator_structures.h"
imports:
- "mongo/idl/basic_types.idl"
@@ -38,14 +37,29 @@ imports:
- "mongo/s/sharding_types.idl"
types:
+ commit_decision_enum:
+ bson_serialization_type: string
+ description: "Ensures that the commit decision serializes/deserializes to a fixed set of
+ string values"
+ cpp_type: "txn::CommitDecision"
+ serializer: "::mongo::txn::writeCommitDecisionEnumProperty"
+ deserializer: "::mongo::txn::readCommitDecisionEnumProperty"
+
+structs:
CoordinatorCommitDecision:
- bson_serialization_type: object
description: "An object representing the coordinator's commit decision."
- cpp_type: "TransactionCoordinator::CoordinatorCommitDecision"
- serializer: "TransactionCoordinator::CoordinatorCommitDecision::toBSON"
- deserializer: "TransactionCoordinator::CoordinatorCommitDecision::fromBSONThrowing"
+ strict: true
+ fields:
+ decision:
+ type: commit_decision_enum
+ description: "See the values in txn::CommitDecision for details. Can only be commit
+ or abort."
+ commitTimestamp:
+ optional: true
+ type: timestamp
+ description: "If the decision is 'commit', contains the chosen commit timestamp,
+ otherwise it will not be set"
-structs:
TransactionCoordinatorDocument:
description: "A document used for majority confirming the coordinator's state changes"
strict: true
diff --git a/src/mongo/db/s/transaction_coordinator_driver.cpp b/src/mongo/db/s/transaction_coordinator_driver.cpp
index 177a3c0a5a3..c54de6058c1 100644
--- a/src/mongo/db/s/transaction_coordinator_driver.cpp
+++ b/src/mongo/db/s/transaction_coordinator_driver.cpp
@@ -39,8 +39,6 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/s/transaction_coordinator.h"
#include "mongo/db/s/transaction_coordinator_document_gen.h"
#include "mongo/db/s/transaction_coordinator_futures_util.h"
#include "mongo/db/write_concern.h"
@@ -48,8 +46,6 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
-#include "mongo/bson/bsontypes.h"
-
namespace mongo {
namespace {
@@ -60,12 +56,15 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList);
MONGO_FAIL_POINT_DEFINE(hangBeforeWritingDecision);
MONGO_FAIL_POINT_DEFINE(hangBeforeDeletingCoordinatorDoc);
+using CommitDecision = txn::CommitDecision;
+using PrepareVote = txn::PrepareVote;
+using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
+
using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
using ResponseStatus = executor::TaskExecutor::ResponseStatus;
-using PrepareVote = txn::PrepareVote;
-using PrepareResponse = txn::PrepareResponse;
-using CommitDecision = txn::CommitDecision;
+using PrepareResponse = TransactionCoordinatorDriver::PrepareResponse;
+using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus;
const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern(
WriteConcernOptions::kInternalMajorityNoSnapshot,
@@ -228,7 +227,7 @@ Future<void> TransactionCoordinatorDriver::persistParticipantList(
});
}
-Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
+Future<PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
const std::vector<ShardId>& participantShards,
const LogicalSessionId& lsid,
TxnNumber txnNumber) {
@@ -255,13 +254,13 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
return txn::collect(
std::move(responses),
// Initial value
- txn::PrepareVoteConsensus{boost::none, boost::none},
+ PrepareVoteConsensus{boost::none, boost::none},
// Aggregates an incoming response (next) with the existing aggregate value (result)
- [prepareScheduler = std::move(prepareScheduler)](txn::PrepareVoteConsensus & result,
+ [prepareScheduler = std::move(prepareScheduler)](PrepareVoteConsensus & result,
const PrepareResponse& next) {
if (!next.vote) {
LOG(3) << "Transaction coordinator did not receive a response from shard "
- << next.participantShardId;
+ << next.shardId;
return txn::ShouldStopIteration::kNo;
}
@@ -343,12 +342,12 @@ void persistDecisionBlocking(OperationContext* opCtx,
TransactionCoordinatorDocument doc;
doc.setId(sessionInfo);
doc.setParticipants(std::move(participantList));
- TransactionCoordinator::CoordinatorCommitDecision decision;
+ txn::CoordinatorCommitDecision decision;
if (commitTimestamp) {
- decision.decision = txn::CommitDecision::kCommit;
- decision.commitTimestamp = commitTimestamp;
+ decision.setDecision(CommitDecision::kCommit);
+ decision.setCommitTimestamp(commitTimestamp);
} else {
- decision.decision = txn::CommitDecision::kAbort;
+ decision.setDecision(CommitDecision::kAbort);
}
doc.setDecision(decision);
entry.setU(doc.toBSON());
@@ -561,9 +560,8 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
// *Always* retry until hearing a conclusive response or being told to stop via a
// coordinator-specific code.
return !swPrepareResponse.isOK() &&
- swPrepareResponse.getStatus() != ErrorCodes::TransactionCoordinatorSteppingDown &&
- swPrepareResponse.getStatus() !=
- ErrorCodes::TransactionCoordinatorReachedAbortDecision;
+ swPrepareResponse != ErrorCodes::TransactionCoordinatorSteppingDown &&
+ swPrepareResponse != ErrorCodes::TransactionCoordinatorReachedAbortDecision;
},
[&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned() ] {
LOG(3) << "Coordinator going to send command " << commandObj << " to "
diff --git a/src/mongo/db/s/transaction_coordinator_driver.h b/src/mongo/db/s/transaction_coordinator_driver.h
index d4833562a2c..139b14b9ea8 100644
--- a/src/mongo/db/s/transaction_coordinator_driver.h
+++ b/src/mongo/db/s/transaction_coordinator_driver.h
@@ -32,6 +32,7 @@
#include <vector>
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/s/transaction_coordinator_document_gen.h"
#include "mongo/db/s/transaction_coordinator_futures_util.h"
#include "mongo/executor/task_executor.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -39,46 +40,6 @@
namespace mongo {
-class TransactionCoordinatorDocument;
-
-namespace txn {
-
-/**
- * The decision made by the coordinator whether to commit or abort the transaction.
- */
-enum class CommitDecision {
- kCommit,
- kAbort,
- kCanceled,
-};
-
-/**
- * An alias to indicate a vote from a participant. This just makes it clearer what's going on in
- * different stages of the commit process.
- */
-using PrepareVote = CommitDecision;
-
-/**
- * Represents a response to prepareTransaction from a single participant. The timestamp will only be
- * present if the participant votes to commit (indicated by the decision field).
- */
-struct PrepareResponse {
- ShardId participantShardId;
- boost::optional<PrepareVote> vote;
- boost::optional<Timestamp> prepareTimestamp;
-};
-
-/**
- * Represents the aggregate of all prepare responses, including the decision that should be made and
- * the max of all prepare timestamps received in the case of a decision to commit.
- */
-struct PrepareVoteConsensus {
- boost::optional<CommitDecision> decision;
- boost::optional<Timestamp> maxPrepareTimestamp;
-};
-
-} // namespace txn
-
/**
* Single instance of this class is owned by each TransactionCoordinator. It abstracts the
* long-running blocking operations into a futurized interface and ensures that all outstanding
@@ -123,9 +84,18 @@ public:
* prepare timestamps attached to the participants' responses. Otherwise the result will simply
* contain the decision to abort.
*/
- Future<txn::PrepareVoteConsensus> sendPrepare(const std::vector<ShardId>& participantShards,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber);
+ struct PrepareVoteConsensus {
+ // Optional decision, if any was reached (decision could be empty if no response was
+ // received)
+ boost::optional<txn::CommitDecision> decision;
+
+ // Should only be consulted if the decision is commit and contains the maximum prepare
+ // timestamp across all participants
+ boost::optional<Timestamp> maxPrepareTimestamp;
+ };
+ Future<PrepareVoteConsensus> sendPrepare(const std::vector<ShardId>& participantShards,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber);
/**
* If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators
@@ -194,7 +164,7 @@ public:
/**
* Reads and returns all documents in config.transaction_coordinators.
*/
- static std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(
+ static std::vector<txn::TransactionCoordinatorDocument> readAllCoordinatorDocs(
OperationContext* opCtx);
//
@@ -210,9 +180,19 @@ public:
* - TransactionCoordinatorSteppingDown
* - ShardNotFound
*/
- Future<txn::PrepareResponse> sendPrepareToShard(txn::AsyncWorkScheduler& scheduler,
- const ShardId& shardId,
- const BSONObj& prepareCommandObj);
+ struct PrepareResponse {
+ // Shard id from which the response was received
+ ShardId shardId;
+
+ // If set to none, this means the shard did not produce a vote
+ boost::optional<txn::PrepareVote> vote;
+
+ // Will only be set if the vote was kCommit
+ boost::optional<Timestamp> prepareTimestamp;
+ };
+ Future<PrepareResponse> sendPrepareToShard(txn::AsyncWorkScheduler& scheduler,
+ const ShardId& shardId,
+ const BSONObj& prepareCommandObj);
/**
* Sends a command corresponding to a commit decision (i.e. commitTransaction or*
diff --git a/src/mongo/db/s/transaction_coordinator_structures.cpp b/src/mongo/db/s/transaction_coordinator_structures.cpp
new file mode 100644
index 00000000000..08be1a0b7aa
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_structures.cpp
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/transaction_coordinator_structures.h"
+
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+namespace txn {
+namespace {
+
+constexpr auto kCommitDecision = "commit"_sd;
+constexpr auto kAbortDecision = "abort"_sd;
+
+} // namespace
+
+CommitDecision readCommitDecisionEnumProperty(StringData decision) {
+ // clang-format off
+ if (decision == kCommitDecision) return CommitDecision::kCommit;
+ if (decision == kAbortDecision) return CommitDecision::kAbort;
+ // clang-format on
+
+ uasserted(ErrorCodes::BadValue,
+ str::stream() << "'" << decision << "' is not a valid decision");
+}
+
+StringData writeCommitDecisionEnumProperty(CommitDecision decision) {
+ // clang-format off
+ switch (decision) {
+ case CommitDecision::kCommit: return kCommitDecision;
+ case CommitDecision::kAbort: return kAbortDecision;
+ case CommitDecision::kCanceled: break;
+ };
+ // clang-format on
+ MONGO_UNREACHABLE;
+}
+
+logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
+ const CommitDecision& decision) {
+ // clang-format off
+ switch (decision) {
+ case txn::CommitDecision::kCommit: return stream << "kCommit";
+ case txn::CommitDecision::kAbort: return stream << "kAbort";
+ case txn::CommitDecision::kCanceled: return stream << "kCanceled";
+ };
+ // clang-format on
+ MONGO_UNREACHABLE;
+}
+
+} // namespace txn
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_structures.h b/src/mongo/db/s/transaction_coordinator_structures.h
new file mode 100644
index 00000000000..bf2c52e2985
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_structures.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/logger/logstream_builder.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+namespace txn {
+
+using ParticipantsList = std::vector<ShardId>;
+
+enum class PrepareVote {
+ kCommit,
+ kAbort,
+ kCanceled,
+};
+
+using CommitDecision = PrepareVote;
+
+/**
+ * String serializer/deserializer for the commit decision property values.
+ */
+CommitDecision readCommitDecisionEnumProperty(StringData decision);
+StringData writeCommitDecisionEnumProperty(CommitDecision decision);
+
+logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
+ const CommitDecision& decision);
+
+} // namespace txn
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_structures_test.cpp b/src/mongo/db/s/transaction_coordinator_structures_test.cpp
new file mode 100644
index 00000000000..6ccc636a222
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_structures_test.cpp
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/transaction_coordinator_document_gen.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace txn {
+namespace {
+
+TEST(CoordinatorCommitDecisionTest, SerializeNoCommitTimestamp) {
+ CoordinatorCommitDecision decision(CommitDecision::kCommit);
+ auto obj = decision.toBSON();
+
+ ASSERT_BSONOBJ_EQ(BSON("decision"
+ << "commit"),
+ obj);
+}
+
+} // namespace
+} // namespace txn
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index 2551eeb2620..f3b89afb50d 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -41,6 +41,9 @@
namespace mongo {
namespace {
+using PrepareResponse = TransactionCoordinatorDriver::PrepareResponse;
+using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
+
const StatusWith<BSONObj> kNoSuchTransaction =
BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction);
const StatusWith<BSONObj> kOk = BSON("ok" << 1);
@@ -381,16 +384,16 @@ protected:
auto decision = doc.getDecision();
if (expectedDecision) {
- ASSERT(*expectedDecision == decision->decision);
+ ASSERT(*expectedDecision == decision->getDecision());
} else {
ASSERT(!decision);
}
if (expectedCommitTimestamp) {
- ASSERT(decision->commitTimestamp);
- ASSERT_EQUALS(*expectedCommitTimestamp, *decision->commitTimestamp);
+ ASSERT(decision->getCommitTimestamp());
+ ASSERT_EQUALS(*expectedCommitTimestamp, *decision->getCommitTimestamp());
} else if (decision) {
- ASSERT(!decision->commitTimestamp);
+ ASSERT(!decision->getCommitTimestamp());
}
}
diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.h b/src/mongo/db/s/transaction_coordinator_test_fixture.h
index e6137730b79..981f4bdb046 100644
--- a/src/mongo/db/s/transaction_coordinator_test_fixture.h
+++ b/src/mongo/db/s/transaction_coordinator_test_fixture.h
@@ -44,8 +44,6 @@ namespace mongo {
*/
class TransactionCoordinatorTestFixture : public ShardServerTestFixture {
protected:
- using PrepareResponse = txn::PrepareResponse;
-
void setUp() override;
/**