diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-03-27 10:21:08 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-04-01 07:26:26 -0400 |
commit | 9dd07662bbe80415d90131fca2be6312166d6f39 (patch) | |
tree | 2c3225830737ac56796901d6e928830be62ede31 /src/mongo | |
parent | d7f0bffc93993bcc7e794344d5a1ae25ff95571b (diff) | |
download | mongo-9dd07662bbe80415d90131fca2be6312166d6f39.tar.gz |
SERVER-40297 Move the TransactionCoordinator data structures to a separate file
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 125 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.h | 45 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_catalog.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_document.idl | 30 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_driver.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_driver.h | 74 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_structures.cpp | 81 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_structures.h | 62 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_structures_test.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test_fixture.h | 2 |
12 files changed, 307 insertions, 212 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 53cb831d5b7..d8edd672d09 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -126,6 +126,7 @@ env.Library( 'transaction_coordinator_factory_mongod.cpp', 'transaction_coordinator_futures_util.cpp', 'transaction_coordinator_service.cpp', + 'transaction_coordinator_structures.cpp', 'transaction_coordinator.cpp', env.Idlc('transaction_coordinator_document.idl')[0], ], @@ -413,6 +414,7 @@ env.CppUnitTest( 'transaction_coordinator_catalog_test.cpp', 'transaction_coordinator_futures_util_test.cpp', 'transaction_coordinator_service_test.cpp', + 'transaction_coordinator_structures_test.cpp', 'transaction_coordinator_test_fixture.cpp', 'transaction_coordinator_test.cpp', ], diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 5ba843566a6..7689ddb2108 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -36,33 +36,35 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/s/transaction_coordinator_document_gen.h" #include "mongo/db/s/transaction_coordinator_futures_util.h" -#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { namespace { -using CoordinatorCommitDecision = TransactionCoordinator::CoordinatorCommitDecision; +using CommitDecision = txn::CommitDecision; +using CoordinatorCommitDecision = txn::CoordinatorCommitDecision; +using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument; -CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus( - ServiceContext* service, - const txn::PrepareVoteConsensus& result, - const LogicalSessionId& lsid, - TxnNumber txnNumber) { +using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus; + +CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(ServiceContext* service, + const PrepareVoteConsensus& result, + const LogicalSessionId& lsid, + TxnNumber txnNumber) { invariant(result.decision); - CoordinatorCommitDecision decision{*result.decision, boost::none}; + CoordinatorCommitDecision decision(*result.decision); - if (result.decision == txn::CommitDecision::kCommit) { + if (result.decision == CommitDecision::kCommit) { invariant(result.maxPrepareTimestamp); - decision.commitTimestamp = Timestamp(result.maxPrepareTimestamp->getSecs(), - result.maxPrepareTimestamp->getInc() + 1); + decision.setCommitTimestamp(Timestamp(result.maxPrepareTimestamp->getSecs(), + result.maxPrepareTimestamp->getInc() + 1)); - LOG(3) << "Advancing cluster time to commit Timestamp " << decision.commitTimestamp.get() - << " of transaction " << txnNumber << " on session " << lsid.getId(); + LOG(3) << "Advancing cluster time to the commit timestamp " + << *decision.getCommitTimestamp() << " for " << lsid.getId() << ':' << txnNumber; uassertStatusOK(LogicalClock::get(service)->advanceClusterTime( - LogicalTime(result.maxPrepareTimestamp.get()))); + LogicalTime(*result.maxPrepareTimestamp))); } return decision; @@ -138,13 +140,11 @@ void TransactionCoordinator::continueCommit(const TransactionCoordinatorDocument // Helper lambda to get the decision either from the document passed in or from the participants // (by performing 'phase one' of two-phase commit). auto getDecision = [&]() -> Future<CoordinatorCommitDecision> { - auto decision = doc.getDecision(); + const auto& decision = doc.getDecision(); if (!decision) { return _runPhaseOne(participantShards); } else { - return (decision->decision == txn::CommitDecision::kCommit) - ? CoordinatorCommitDecision{txn::CommitDecision::kCommit, decision->commitTimestamp} - : CoordinatorCommitDecision{txn::CommitDecision::kAbort, boost::none}; + return *decision; } }; @@ -169,7 +169,7 @@ Future<void> TransactionCoordinator::onCompletion() { [](const Status& s) { uasserted(ErrorCodes::InterruptedDueToStepDown, s.reason()); }); } -SharedSemiFuture<txn::CommitDecision> TransactionCoordinator::getDecision() { +SharedSemiFuture<CommitDecision> TransactionCoordinator::getDecision() { stdx::lock_guard<stdx::mutex> lg(_mutex); return _decisionPromise.getFuture(); } @@ -180,7 +180,7 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() { stdx::unique_lock<stdx::mutex> lk(_mutex); if (_state == CoordinatorState::kInit) { invariant(!_decisionPromise.getFuture().isReady()); - _decisionPromise.emplaceValue(txn::CommitDecision::kCanceled); + _decisionPromise.emplaceValue(CommitDecision::kCanceled); _transitionToDone(std::move(lk)); } } @@ -195,14 +195,15 @@ void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() { Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne( const std::vector<ShardId>& participantShards) { return _driver.sendPrepare(participantShards, _lsid, _txnNumber) - .then([this, participantShards](txn::PrepareVoteConsensus result) { + .then([this, participantShards](PrepareVoteConsensus result) { invariant(_state == CoordinatorState::kPreparing); auto decision = makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber); return _driver - .persistDecision(_lsid, _txnNumber, participantShards, decision.commitTimestamp) + .persistDecision( + _lsid, _txnNumber, participantShards, decision.getCommitTimestamp()) .then([decision] { return decision; }); }); } @@ -211,11 +212,8 @@ Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& pa const CoordinatorCommitDecision& decision) { return _sendDecisionToParticipants(participantShards, decision) .then([this] { - if (getGlobalFailPointRegistry() - ->getFailPoint("doNotForgetCoordinator") - ->shouldFail()) { + if (MONGO_FAIL_POINT(doNotForgetCoordinator)) return Future<void>::makeReady(); - } return _driver.deleteCoordinatorDoc(_lsid, _txnNumber); }) @@ -230,17 +228,17 @@ Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& pa Future<void> TransactionCoordinator::_sendDecisionToParticipants( const std::vector<ShardId>& participantShards, CoordinatorCommitDecision decision) { invariant(_state == CoordinatorState::kPreparing); - _decisionPromise.emplaceValue(decision.decision); + _decisionPromise.emplaceValue(decision.getDecision()); - switch (decision.decision) { - case txn::CommitDecision::kCommit: + switch (decision.getDecision()) { + case CommitDecision::kCommit: _state = CoordinatorState::kCommitting; return _driver.sendCommit( - participantShards, _lsid, _txnNumber, *decision.commitTimestamp); - case txn::CommitDecision::kAbort: + participantShards, _lsid, _txnNumber, *decision.getCommitTimestamp()); + case CommitDecision::kAbort: _state = CoordinatorState::kAborting; return _driver.sendAbort(participantShards, _lsid, _txnNumber); - case txn::CommitDecision::kCanceled: + case CommitDecision::kCanceled: MONGO_UNREACHABLE; }; MONGO_UNREACHABLE; @@ -292,57 +290,6 @@ void TransactionCoordinator::_transitionToDone(stdx::unique_lock<stdx::mutex> lk } } -StatusWith<CoordinatorCommitDecision> CoordinatorCommitDecision::fromBSON(const BSONObj& doc) { - CoordinatorCommitDecision decision; - - for (const auto& e : doc) { - const auto fieldName = e.fieldNameStringData(); - - if (fieldName == "decision") { - if (e.type() != String) { - return Status(ErrorCodes::TypeMismatch, "decision must be a string"); - } - - if (e.str() == "commit") { - decision.decision = txn::CommitDecision::kCommit; - } else if (e.str() == "abort") { - decision.decision = txn::CommitDecision::kAbort; - } else { - return Status(ErrorCodes::BadValue, "decision must be either 'abort' or 'commit'"); - } - } else if (fieldName == "commitTimestamp") { - if (e.type() != bsonTimestamp && e.type() != Date) { - return Status(ErrorCodes::TypeMismatch, "commit timestamp must be a timestamp"); - } - decision.commitTimestamp = {e.timestamp()}; - } - } - - if (decision.decision == txn::CommitDecision::kAbort && decision.commitTimestamp) { - return Status(ErrorCodes::BadValue, "abort decision cannot have a timestamp"); - } - if (decision.decision == txn::CommitDecision::kCommit && !decision.commitTimestamp) { - return Status(ErrorCodes::BadValue, "commit decision must have a timestamp"); - } - - return decision; -} - -BSONObj CoordinatorCommitDecision::toBSON() const { - BSONObjBuilder builder; - - if (decision == txn::CommitDecision::kCommit) { - builder.append("decision", "commit"); - } else { - builder.append("decision", "abort"); - } - if (commitTimestamp) { - builder.append("commitTimestamp", *commitTimestamp); - } - - return builder.obj(); -} - logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, const TransactionCoordinator::CoordinatorState& state) { using State = TransactionCoordinator::CoordinatorState; @@ -358,16 +305,4 @@ logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, return stream; } -logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, - const txn::CommitDecision& decision) { - // clang-format off - switch (decision) { - case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break; - case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break; - case txn::CommitDecision::kCanceled: stream.stream() << "kCanceled"; break; - }; - // clang-format on - return stream; -} - } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index a6bd7642d10..1e3ec5d820b 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -33,6 +33,7 @@ #include "mongo/db/s/transaction_coordinator_driver.h" #include "mongo/logger/logstream_builder.h" +#include "mongo/util/fail_point_service.h" namespace mongo { @@ -66,34 +67,6 @@ public: ~TransactionCoordinator(); /** - * Represents a decision made by the coordinator, including commit timestamp to be sent with - * commitTransaction in the case of a decision to commit. - */ - struct CoordinatorCommitDecision { - txn::CommitDecision decision; - boost::optional<Timestamp> commitTimestamp; - - /** - * Parses a CoordinatorCommitDecision from the object. - */ - static StatusWith<CoordinatorCommitDecision> fromBSON(const BSONObj& obj); - - /** - * Returns an instance of CoordinatorCommitDecision from an object. - * - * Throws if the object cannot be deserialized. - */ - static CoordinatorCommitDecision fromBSONThrowing(const BSONObj& obj) { - return uassertStatusOK(fromBSON(obj)); - }; - - /** - * Returns the BSON representation of this object. - */ - BSONObj toBSON() const; - }; - - /** * The state of the coordinator. */ enum class CoordinatorState { @@ -120,7 +93,7 @@ public: /** * To be used to continue coordinating a transaction on step up. */ - void continueCommit(const TransactionCoordinatorDocument& doc); + void continueCommit(const txn::TransactionCoordinatorDocument& doc); /** * Gets a Future that will contain the decision that the coordinator reaches. Note that this @@ -162,7 +135,8 @@ private: * 3. If the decision is to commit, calculates the commit Timestamp. * 4. Writes the decision and waits for the decision to become majority-committed. */ - Future<CoordinatorCommitDecision> _runPhaseOne(const std::vector<ShardId>& participantShards); + Future<txn::CoordinatorCommitDecision> _runPhaseOne( + const std::vector<ShardId>& participantShards); /** * Expects the decision to already be majority-committed. @@ -173,14 +147,14 @@ private: * majority-committed. */ Future<void> _runPhaseTwo(const std::vector<ShardId>& participantShards, - const CoordinatorCommitDecision& decision); + const txn::CoordinatorCommitDecision& decision); /** * Asynchronously sends the commit decision to all participants (commit or abort), resolving the * returned future when all participants have acknowledged the decision. */ Future<void> _sendDecisionToParticipants(const std::vector<ShardId>& participantShards, - CoordinatorCommitDecision coordinatorDecision); + txn::CoordinatorCommitDecision coordinatorDecision); /** * Helper for handling errors that occur during either phase of commit coordination. @@ -236,10 +210,11 @@ private: std::vector<Promise<void>> _completionPromises; }; -logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, - const TransactionCoordinator::CoordinatorState& state); +// TODO (SERVER-37886): Remove this failpoint once failover can be tested on coordinators that have +// a local participant +MONGO_FAIL_POINT_DECLARE(doNotForgetCoordinator); logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, - const txn::CommitDecision& decision); + const TransactionCoordinator::CoordinatorState& state); } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp index 8d5dc379d0e..069fe02fb47 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp @@ -33,13 +33,10 @@ #include "mongo/db/s/transaction_coordinator_catalog.h" -#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { -// TODO (SERVER-37886): Remove this failpoint once failover can be tested on coordinators that have -// a local participant. MONGO_FAIL_POINT_DEFINE(doNotForgetCoordinator); TransactionCoordinatorCatalog::TransactionCoordinatorCatalog() = default; diff --git a/src/mongo/db/s/transaction_coordinator_document.idl b/src/mongo/db/s/transaction_coordinator_document.idl index c7885075073..78dadac283b 100644 --- a/src/mongo/db/s/transaction_coordinator_document.idl +++ b/src/mongo/db/s/transaction_coordinator_document.idl @@ -27,10 +27,9 @@ # global: - cpp_namespace: "mongo" + cpp_namespace: "mongo::txn" cpp_includes: - - "mongo/db/logical_session_id.h" - - "mongo/db/s/transaction_coordinator.h" + - "mongo/db/s/transaction_coordinator_structures.h" imports: - "mongo/idl/basic_types.idl" @@ -38,14 +37,29 @@ imports: - "mongo/s/sharding_types.idl" types: + commit_decision_enum: + bson_serialization_type: string + description: "Ensures that the commit decision serializes/deserializes to a fixed set of + string values" + cpp_type: "txn::CommitDecision" + serializer: "::mongo::txn::writeCommitDecisionEnumProperty" + deserializer: "::mongo::txn::readCommitDecisionEnumProperty" + +structs: CoordinatorCommitDecision: - bson_serialization_type: object description: "An object representing the coordinator's commit decision." - cpp_type: "TransactionCoordinator::CoordinatorCommitDecision" - serializer: "TransactionCoordinator::CoordinatorCommitDecision::toBSON" - deserializer: "TransactionCoordinator::CoordinatorCommitDecision::fromBSONThrowing" + strict: true + fields: + decision: + type: commit_decision_enum + description: "See the values in txn::CommitDecision for details. Can only be commit + or abort." + commitTimestamp: + optional: true + type: timestamp + description: "If the decision is 'commit', contains the chosen commit timestamp, + otherwise it will not be set" -structs: TransactionCoordinatorDocument: description: "A document used for majority confirming the coordinator's state changes" strict: true diff --git a/src/mongo/db/s/transaction_coordinator_driver.cpp b/src/mongo/db/s/transaction_coordinator_driver.cpp index 177a3c0a5a3..c54de6058c1 100644 --- a/src/mongo/db/s/transaction_coordinator_driver.cpp +++ b/src/mongo/db/s/transaction_coordinator_driver.cpp @@ -39,8 +39,6 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/transaction_coordinator.h" #include "mongo/db/s/transaction_coordinator_document_gen.h" #include "mongo/db/s/transaction_coordinator_futures_util.h" #include "mongo/db/write_concern.h" @@ -48,8 +46,6 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" -#include "mongo/bson/bsontypes.h" - namespace mongo { namespace { @@ -60,12 +56,15 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList); MONGO_FAIL_POINT_DEFINE(hangBeforeWritingDecision); MONGO_FAIL_POINT_DEFINE(hangBeforeDeletingCoordinatorDoc); +using CommitDecision = txn::CommitDecision; +using PrepareVote = txn::PrepareVote; +using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument; + using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using ResponseStatus = executor::TaskExecutor::ResponseStatus; -using PrepareVote = txn::PrepareVote; -using PrepareResponse = txn::PrepareResponse; -using CommitDecision = txn::CommitDecision; +using PrepareResponse = TransactionCoordinatorDriver::PrepareResponse; +using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus; const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern( WriteConcernOptions::kInternalMajorityNoSnapshot, @@ -228,7 +227,7 @@ Future<void> TransactionCoordinatorDriver::persistParticipantList( }); } -Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( +Future<PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( const std::vector<ShardId>& participantShards, const LogicalSessionId& lsid, TxnNumber txnNumber) { @@ -255,13 +254,13 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( return txn::collect( std::move(responses), // Initial value - txn::PrepareVoteConsensus{boost::none, boost::none}, + PrepareVoteConsensus{boost::none, boost::none}, // Aggregates an incoming response (next) with the existing aggregate value (result) - [prepareScheduler = std::move(prepareScheduler)](txn::PrepareVoteConsensus & result, + [prepareScheduler = std::move(prepareScheduler)](PrepareVoteConsensus & result, const PrepareResponse& next) { if (!next.vote) { LOG(3) << "Transaction coordinator did not receive a response from shard " - << next.participantShardId; + << next.shardId; return txn::ShouldStopIteration::kNo; } @@ -343,12 +342,12 @@ void persistDecisionBlocking(OperationContext* opCtx, TransactionCoordinatorDocument doc; doc.setId(sessionInfo); doc.setParticipants(std::move(participantList)); - TransactionCoordinator::CoordinatorCommitDecision decision; + txn::CoordinatorCommitDecision decision; if (commitTimestamp) { - decision.decision = txn::CommitDecision::kCommit; - decision.commitTimestamp = commitTimestamp; + decision.setDecision(CommitDecision::kCommit); + decision.setCommitTimestamp(commitTimestamp); } else { - decision.decision = txn::CommitDecision::kAbort; + decision.setDecision(CommitDecision::kAbort); } doc.setDecision(decision); entry.setU(doc.toBSON()); @@ -561,9 +560,8 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard( // *Always* retry until hearing a conclusive response or being told to stop via a // coordinator-specific code. return !swPrepareResponse.isOK() && - swPrepareResponse.getStatus() != ErrorCodes::TransactionCoordinatorSteppingDown && - swPrepareResponse.getStatus() != - ErrorCodes::TransactionCoordinatorReachedAbortDecision; + swPrepareResponse != ErrorCodes::TransactionCoordinatorSteppingDown && + swPrepareResponse != ErrorCodes::TransactionCoordinatorReachedAbortDecision; }, [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned() ] { LOG(3) << "Coordinator going to send command " << commandObj << " to " diff --git a/src/mongo/db/s/transaction_coordinator_driver.h b/src/mongo/db/s/transaction_coordinator_driver.h index d4833562a2c..139b14b9ea8 100644 --- a/src/mongo/db/s/transaction_coordinator_driver.h +++ b/src/mongo/db/s/transaction_coordinator_driver.h @@ -32,6 +32,7 @@ #include <vector> #include "mongo/db/logical_session_id.h" +#include "mongo/db/s/transaction_coordinator_document_gen.h" #include "mongo/db/s/transaction_coordinator_futures_util.h" #include "mongo/executor/task_executor.h" #include "mongo/util/concurrency/thread_pool.h" @@ -39,46 +40,6 @@ namespace mongo { -class TransactionCoordinatorDocument; - -namespace txn { - -/** - * The decision made by the coordinator whether to commit or abort the transaction. - */ -enum class CommitDecision { - kCommit, - kAbort, - kCanceled, -}; - -/** - * An alias to indicate a vote from a participant. This just makes it clearer what's going on in - * different stages of the commit process. - */ -using PrepareVote = CommitDecision; - -/** - * Represents a response to prepareTransaction from a single participant. The timestamp will only be - * present if the participant votes to commit (indicated by the decision field). - */ -struct PrepareResponse { - ShardId participantShardId; - boost::optional<PrepareVote> vote; - boost::optional<Timestamp> prepareTimestamp; -}; - -/** - * Represents the aggregate of all prepare responses, including the decision that should be made and - * the max of all prepare timestamps received in the case of a decision to commit. - */ -struct PrepareVoteConsensus { - boost::optional<CommitDecision> decision; - boost::optional<Timestamp> maxPrepareTimestamp; -}; - -} // namespace txn - /** * Single instance of this class is owned by each TransactionCoordinator. It abstracts the * long-running blocking operations into a futurized interface and ensures that all outstanding @@ -123,9 +84,18 @@ public: * prepare timestamps attached to the participants' responses. Otherwise the result will simply * contain the decision to abort. */ - Future<txn::PrepareVoteConsensus> sendPrepare(const std::vector<ShardId>& participantShards, - const LogicalSessionId& lsid, - TxnNumber txnNumber); + struct PrepareVoteConsensus { + // Optional decision, if any was reached (decision could be empty if no response was + // received) + boost::optional<txn::CommitDecision> decision; + + // Should only be consulted if the decision is commit and contains the maximum prepare + // timestamp across all participants + boost::optional<Timestamp> maxPrepareTimestamp; + }; + Future<PrepareVoteConsensus> sendPrepare(const std::vector<ShardId>& participantShards, + const LogicalSessionId& lsid, + TxnNumber txnNumber); /** * If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators @@ -194,7 +164,7 @@ public: /** * Reads and returns all documents in config.transaction_coordinators. */ - static std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs( + static std::vector<txn::TransactionCoordinatorDocument> readAllCoordinatorDocs( OperationContext* opCtx); // @@ -210,9 +180,19 @@ public: * - TransactionCoordinatorSteppingDown * - ShardNotFound */ - Future<txn::PrepareResponse> sendPrepareToShard(txn::AsyncWorkScheduler& scheduler, - const ShardId& shardId, - const BSONObj& prepareCommandObj); + struct PrepareResponse { + // Shard id from which the response was received + ShardId shardId; + + // If set to none, this means the shard did not produce a vote + boost::optional<txn::PrepareVote> vote; + + // Will only be set if the vote was kCommit + boost::optional<Timestamp> prepareTimestamp; + }; + Future<PrepareResponse> sendPrepareToShard(txn::AsyncWorkScheduler& scheduler, + const ShardId& shardId, + const BSONObj& prepareCommandObj); /** * Sends a command corresponding to a commit decision (i.e. commitTransaction or* diff --git a/src/mongo/db/s/transaction_coordinator_structures.cpp b/src/mongo/db/s/transaction_coordinator_structures.cpp new file mode 100644 index 00000000000..08be1a0b7aa --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_structures.cpp @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/transaction_coordinator_structures.h" + +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace txn { +namespace { + +constexpr auto kCommitDecision = "commit"_sd; +constexpr auto kAbortDecision = "abort"_sd; + +} // namespace + +CommitDecision readCommitDecisionEnumProperty(StringData decision) { + // clang-format off + if (decision == kCommitDecision) return CommitDecision::kCommit; + if (decision == kAbortDecision) return CommitDecision::kAbort; + // clang-format on + + uasserted(ErrorCodes::BadValue, + str::stream() << "'" << decision << "' is not a valid decision"); +} + +StringData writeCommitDecisionEnumProperty(CommitDecision decision) { + // clang-format off + switch (decision) { + case CommitDecision::kCommit: return kCommitDecision; + case CommitDecision::kAbort: return kAbortDecision; + case CommitDecision::kCanceled: break; + }; + // clang-format on + MONGO_UNREACHABLE; +} + +logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, + const CommitDecision& decision) { + // clang-format off + switch (decision) { + case txn::CommitDecision::kCommit: return stream << "kCommit"; + case txn::CommitDecision::kAbort: return stream << "kAbort"; + case txn::CommitDecision::kCanceled: return stream << "kCanceled"; + }; + // clang-format on + MONGO_UNREACHABLE; +} + +} // namespace txn +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_structures.h b/src/mongo/db/s/transaction_coordinator_structures.h new file mode 100644 index 00000000000..bf2c52e2985 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_structures.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/logger/logstream_builder.h" +#include "mongo/s/shard_id.h" + +namespace mongo { +namespace txn { + +using ParticipantsList = std::vector<ShardId>; + +enum class PrepareVote { + kCommit, + kAbort, + kCanceled, +}; + +using CommitDecision = PrepareVote; + +/** + * String serializer/deserializer for the commit decision property values. + */ +CommitDecision readCommitDecisionEnumProperty(StringData decision); +StringData writeCommitDecisionEnumProperty(CommitDecision decision); + +logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, + const CommitDecision& decision); + +} // namespace txn +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_structures_test.cpp b/src/mongo/db/s/transaction_coordinator_structures_test.cpp new file mode 100644 index 00000000000..6ccc636a222 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_structures_test.cpp @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/transaction_coordinator_document_gen.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace txn { +namespace { + +TEST(CoordinatorCommitDecisionTest, SerializeNoCommitTimestamp) { + CoordinatorCommitDecision decision(CommitDecision::kCommit); + auto obj = decision.toBSON(); + + ASSERT_BSONOBJ_EQ(BSON("decision" + << "commit"), + obj); +} + +} // namespace +} // namespace txn +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index 2551eeb2620..f3b89afb50d 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -41,6 +41,9 @@ namespace mongo { namespace { +using PrepareResponse = TransactionCoordinatorDriver::PrepareResponse; +using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument; + const StatusWith<BSONObj> kNoSuchTransaction = BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction); const StatusWith<BSONObj> kOk = BSON("ok" << 1); @@ -381,16 +384,16 @@ protected: auto decision = doc.getDecision(); if (expectedDecision) { - ASSERT(*expectedDecision == decision->decision); + ASSERT(*expectedDecision == decision->getDecision()); } else { ASSERT(!decision); } if (expectedCommitTimestamp) { - ASSERT(decision->commitTimestamp); - ASSERT_EQUALS(*expectedCommitTimestamp, *decision->commitTimestamp); + ASSERT(decision->getCommitTimestamp()); + ASSERT_EQUALS(*expectedCommitTimestamp, *decision->getCommitTimestamp()); } else if (decision) { - ASSERT(!decision->commitTimestamp); + ASSERT(!decision->getCommitTimestamp()); } } diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.h b/src/mongo/db/s/transaction_coordinator_test_fixture.h index e6137730b79..981f4bdb046 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.h +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.h @@ -44,8 +44,6 @@ namespace mongo { */ class TransactionCoordinatorTestFixture : public ShardServerTestFixture { protected: - using PrepareResponse = txn::PrepareResponse; - void setUp() override; /** |