diff options
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog_test.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_driver.cpp | 343 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_driver.h | 21 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_futures_util.h | 45 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.cpp | 160 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.h | 20 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service_test.cpp | 131 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_test.cpp | 142 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_test_fixture.cpp | 120 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_test_fixture.h | 80 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 10 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 5 |
16 files changed, 438 insertions, 676 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 5e2b6201f06..b5f175b806a 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -727,6 +727,7 @@ env.CppUnitTest( 'transaction_coordinator_catalog_test.cpp', 'transaction_coordinator_futures_util_test.cpp', 'transaction_coordinator_service_test.cpp', + 'transaction_coordinator_test_fixture.cpp', 'transaction_coordinator_test.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp b/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp index 35f90c58c51..226cb6d8306 100644 --- a/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp +++ b/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp @@ -73,7 +73,6 @@ void recoverDecisionFromLocalParticipantOrAbortLocalParticipant(OperationContext // The local participant's txnNumber matched the request's txnNumber, and the txnNumber // corresponds to a transaction (not a retryable write). - if (txnParticipant->transactionIsCommitted()) { return; } @@ -82,8 +81,12 @@ void recoverDecisionFromLocalParticipantOrAbortLocalParticipant(OperationContext } catch (const DBException& e) { // Convert a PreparedTransactionInProgress error to an anonymous error code. uassert(51021, - "coordinateCommitTransaction command found local participant is prepared but no " - "active coordinator exists", + str::stream() << "coordinateCommitTransaction found local participant for " + << opCtx->getLogicalSessionId()->getId() + << ":" + << *opCtx->getTxnNumber() + << " is prepared but no " + "active coordinator exists", e.code() != ErrorCodes::PreparedTransactionInProgress); throw; } diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp index 7eab0ada758..463cdb879a8 100644 --- a/src/mongo/db/transaction_coordinator.cpp +++ b/src/mongo/db/transaction_coordinator.cpp @@ -69,13 +69,11 @@ CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus( } // namespace -TransactionCoordinator::TransactionCoordinator(ServiceContext* service, - executor::TaskExecutor* networkExecutor, - ThreadPool* callbackPool, +TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, const LogicalSessionId& lsid, const TxnNumber& txnNumber) - : _service(service), - _driver(networkExecutor, callbackPool), + : _serviceContext(serviceContext), + _driver(serviceContext), _lsid(lsid), _txnNumber(txnNumber), _state(CoordinatorState::kInit) {} @@ -120,7 +118,7 @@ Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne( invariant(_state == CoordinatorState::kPreparing); auto decision = - makeDecisionFromPrepareVoteConsensus(_service, result, _lsid, _txnNumber); + makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber); return _driver .persistDecision(_lsid, _txnNumber, participantShards, decision.commitTimestamp) diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h index 6ab42e4d76b..063e32e209a 100644 --- a/src/mongo/db/transaction_coordinator.h +++ b/src/mongo/db/transaction_coordinator.h @@ -44,9 +44,7 @@ class TransactionCoordinator { MONGO_DISALLOW_COPYING(TransactionCoordinator); public: - TransactionCoordinator(ServiceContext* service, - executor::TaskExecutor* networkExecutor, - ThreadPool* callbackPool, + TransactionCoordinator(ServiceContext* serviceContext, const LogicalSessionId& lsid, const TxnNumber& txnNumber); ~TransactionCoordinator(); @@ -166,7 +164,7 @@ private: void _transitionToDone(stdx::unique_lock<stdx::mutex> lk) noexcept; // Shortcut to the service context under which this coordinator runs - ServiceContext* const _service; + ServiceContext* const _serviceContext; // Context object used to perform and track the state of asynchronous operations on behalf of // this coordinator. diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp index 9590de998e6..47303b590f6 100644 --- a/src/mongo/db/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp @@ -31,10 +31,6 @@ #include "mongo/platform/basic.h" #include "mongo/db/transaction_coordinator_catalog.h" - -#include <boost/optional/optional_io.hpp> - -#include "mongo/db/transaction_coordinator.h" #include "mongo/s/shard_server_test_fixture.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" @@ -73,11 +69,8 @@ public: void createCoordinatorInCatalog(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) { - auto newCoordinator = std::make_shared<TransactionCoordinator>(getServiceContext(), - nullptr /* TaskExecutor */, - nullptr /* ThreadPool */, - lsid, - txnNumber); + auto newCoordinator = + std::make_shared<TransactionCoordinator>(getServiceContext(), lsid, txnNumber); coordinatorCatalog().insert(opCtx, lsid, txnNumber, newCoordinator); _coordinatorsForTest.push_back(newCoordinator); diff --git a/src/mongo/db/transaction_coordinator_driver.cpp b/src/mongo/db/transaction_coordinator_driver.cpp index 3a77c622a02..00bcc9015d8 100644 --- a/src/mongo/db/transaction_coordinator_driver.cpp +++ b/src/mongo/db/transaction_coordinator_driver.cpp @@ -35,7 +35,6 @@ #include "mongo/db/transaction_coordinator_driver.h" #include "mongo/client/remote_command_retry_scheduler.h" -#include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/dbdirectclient.h" @@ -44,7 +43,6 @@ #include "mongo/db/transaction_coordinator_futures_util.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/grid.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -70,103 +68,9 @@ const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern( WriteConcernOptions::SyncMode::UNSET, WriteConcernOptions::kNoTimeout); -/** - * Finds the host and port for a shard. - */ -HostAndPort targetHost(const ShardId& shardId, const ReadPreferenceSetting& readPref) { - const auto opCtxHolder = cc().makeOperationContext(); - const auto opCtx = opCtxHolder.get(); - auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); - // TODO SERVER-35678 return a SemiFuture<HostAndPort> rather than using a blocking call to - // get(). - return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20)).get(opCtx); -} +const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); -/** - * Finds the host and port for a shard. - * - * Possible errors: [ShardNotFound, ShutdownInProgress] - * - * TODO (SERVER-37880): Implement backoff for retries. - */ -Future<HostAndPort> targetHostAsync(ThreadPool* pool, - const ShardId& shardId, - const ReadPreferenceSetting& readPref) { - return txn::doUntilSuccessOrOneOf({ErrorCodes::ShardNotFound, ErrorCodes::ShutdownInProgress}, - [pool, shardId, readPref]() { - return txn::async(pool, [shardId, readPref]() { - return targetHost(shardId, readPref); - }); - }); -} - -/** - * Sends a command asynchronously to a shard using the given executor. The thread pool is used for - * targeting the shard. - * - * If the command is sent successfully and a response is received, the resulting Future will contain - * the ResponseStatus from the response. - * - * An error status is set on the resulting Future if any of the following occurs: - * - If the targeting the shard fails [Possible errors: ShardNotFound, ShutdownInProgress] - * - If the command cannot be scheduled to run on the executor [Possible errors: - * ShutdownInProgress, maybe others] - * - If a response is received that indicates that the destination shard was did not receive the - * command or could not process the command (e.g. because the command did not reach the shard due - * to a network issue) [Possible errors: Whatever the returned status is in the response] - */ -Future<ResponseStatus> sendAsyncCommandToShard(executor::TaskExecutor* executor, - ThreadPool* pool, - const ShardId& shardId, - const BSONObj& commandObj) { - - auto promiseAndFuture = makePromiseFuture<ResponseStatus>(); - auto sharedPromise = - std::make_shared<Promise<ResponseStatus>>(std::move(promiseAndFuture.promise)); - - auto readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly); - targetHostAsync(pool, shardId, readPref) - .then([ executor, shardId, sharedPromise, commandObj = commandObj.getOwned(), readPref ]( - HostAndPort shardHostAndPort) mutable { - - LOG(3) << "Coordinator going to send command " << commandObj << " to shard " << shardId; - - executor::RemoteCommandRequest request( - shardHostAndPort, "admin", commandObj, readPref.toContainingBSON(), nullptr); - - auto swCallbackHandle = executor->scheduleRemoteCommand( - request, [ commandObj = commandObj.getOwned(), - shardId, - sharedPromise ](const RemoteCommandCallbackArgs& args) mutable { - LOG(3) << "Coordinator shard got response " << args.response.data << " for " - << commandObj << " to " << shardId; - auto status = args.response.status; - // Only consider actual failures to send the command as errors. - if (status.isOK()) { - sharedPromise->emplaceValue(args.response); - } else { - sharedPromise->setError(status); - } - }); - - if (!swCallbackHandle.isOK()) { - LOG(3) << "Coordinator shard failed to schedule the task to send " << commandObj - << " to shard " << shardId << causedBy(swCallbackHandle.getStatus()); - sharedPromise->setError(swCallbackHandle.getStatus()); - } - }) - .onError([ shardId, commandObj = commandObj.getOwned(), sharedPromise ](Status s) { - LOG(3) << "Coordinator shard failed to target command " << commandObj << " to shard " - << shardId << causedBy(s); - - sharedPromise->setError(s); - }) - .getAsync([](Status) {}); - - // Do not wait for the callback to run. The callback will reschedule the remote request on - // the same executor if necessary. - return std::move(promiseAndFuture.future); -} +const ReadPreferenceSetting kPrimaryReadPreference{ReadPreference::PrimaryOnly}; bool isRetryableError(ErrorCodes::Error code) { // TODO (SERVER-37880): Consider using RemoteCommandRetryScheduler. @@ -192,61 +96,6 @@ BSONArray buildParticipantListMatchesConditions(const std::vector<ShardId>& part return BSON_ARRAY(participantListContains << participantListHasSize); } -/** - * Processes a response from a participant to the prepareTransaction command. An OK response is - * interpreted as a vote to commit, and the prepareTimestamp is extracted from the response and - * returned. A response that indicates a vote to abort is interpreted as such, and a retryable error - * will re-throw so that we can retry the prepare command. - */ -PrepareResponse getCommitVoteFromPrepareResponse(const ShardId& shardId, - const ResponseStatus& response) { - uassertStatusOK(getWriteConcernStatusFromCommandResult(response.data)); - auto status = getStatusFromCommandResult(response.data); - auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); - - // There must be no write concern errors in order for us to be able to interpret the command - // response. As an example, it's possible for us to get NoSuchTransaction from a non-primary - // node of a shard, in which case we might also receive a write concern error but it can't be - // interpreted as a vote to abort. - if (!wcStatus.isOK()) { - status = wcStatus; - } - - if (status.isOK()) { - auto prepareTimestampField = response.data["prepareTimestamp"]; - if (prepareTimestampField.eoo() || prepareTimestampField.timestamp().isNull()) { - LOG(0) << "Coordinator shard received an OK response to prepareTransaction " - "without a prepareTimestamp from shard " - << shardId << ", which is not expected behavior. Interpreting the response from " - << shardId << " as a vote to abort"; - return PrepareResponse{shardId, PrepareVote::kAbort, boost::none}; - } - - LOG(3) << "Coordinator shard received a vote to commit from shard " << shardId - << " with prepareTimestamp: " << prepareTimestampField.timestamp(); - - return PrepareResponse{shardId, PrepareVote::kCommit, prepareTimestampField.timestamp()}; - } else if (ErrorCodes::isVoteAbortError(status.code())) { - LOG(3) << "Coordinator shard received a vote to abort from shard " << shardId; - return PrepareResponse{shardId, PrepareVote::kAbort, boost::none}; - } else if (isRetryableError(status.code())) { - LOG(3) << "Coordinator shard received a retryable error in response to prepareTransaction " - "from shard " - << shardId << causedBy(status); - - // Rethrow error so that we retry. - uassertStatusOK(status); - } else { - // Non-retryable errors lead to an abort decision. - LOG(3) - << "Coordinator shard received a non-retryable error in response to prepareTransaction " - "from shard " - << shardId << causedBy(status); - return PrepareResponse{shardId, PrepareVote::kAbort, boost::none}; - } - MONGO_UNREACHABLE; -} - std::string buildParticipantListString(const std::vector<ShardId>& participantList) { StringBuilder ss; ss << "["; @@ -259,9 +108,8 @@ std::string buildParticipantListString(const std::vector<ShardId>& participantLi } // namespace -TransactionCoordinatorDriver::TransactionCoordinatorDriver(executor::TaskExecutor* executor, - ThreadPool* pool) - : _executor(executor), _pool(pool) {} +TransactionCoordinatorDriver::TransactionCoordinatorDriver(ServiceContext* service) + : _scheduler(service) {} TransactionCoordinatorDriver::~TransactionCoordinatorDriver() = default; @@ -358,9 +206,8 @@ void persistParticipantListBlocking(OperationContext* opCtx, Future<void> TransactionCoordinatorDriver::persistParticipantList( const LogicalSessionId& lsid, TxnNumber txnNumber, std::vector<ShardId> participantList) { - return txn::async(_pool, [lsid, txnNumber, participantList] { - auto opCtx = Client::getCurrent()->makeOperationContext(); - persistParticipantListBlocking(opCtx.get(), lsid, txnNumber, participantList); + return _scheduler.scheduleWork([lsid, txnNumber, participantList](OperationContext* opCtx) { + persistParticipantListBlocking(opCtx, lsid, txnNumber, participantList); }); } @@ -400,18 +247,19 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( switch (*next.vote) { case PrepareVote::kAbort: - LOG(3) << "Transaction coordinator received a vote to abort from shard " - << next.participantShardId; + if (result.decision == CommitDecision::kAbort) { + LOG(3) << "Ignoring vote to abort from shard " << next.participantShardId + << " because a vote to abort has already been received"; + break; + } result.decision = CommitDecision::kAbort; result.maxPrepareTimestamp = boost::none; cancel(); break; case PrepareVote::kCommit: - LOG(3) << "Transaction coordinator received a vote to commit from shard " - << next.participantShardId; if (result.decision == CommitDecision::kAbort) { - LOG(3) << "Ignoring commmit decision from shard " << next.participantShardId - << " because abort decision was received previously"; + LOG(3) << "Ignoring vote to commit from shard " << next.participantShardId + << " because a vote to abort has already been received"; break; } @@ -544,10 +392,10 @@ Future<void> TransactionCoordinatorDriver::persistDecision( TxnNumber txnNumber, std::vector<ShardId> participantList, const boost::optional<Timestamp>& commitTimestamp) { - return txn::async(_pool, [lsid, txnNumber, participantList, commitTimestamp] { - auto opCtx = Client::getCurrent()->makeOperationContext(); - persistDecisionBlocking(opCtx.get(), lsid, txnNumber, participantList, commitTimestamp); - }); + return _scheduler.scheduleWork( + [lsid, txnNumber, participantList, commitTimestamp](OperationContext* opCtx) { + persistDecisionBlocking(opCtx, lsid, txnNumber, participantList, commitTimestamp); + }); } Future<void> TransactionCoordinatorDriver::sendCommit(const std::vector<ShardId>& participantShards, @@ -654,9 +502,8 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSessionId& lsid, TxnNumber txnNumber) { - return txn::async(_pool, [lsid, txnNumber] { - auto opCtx = Client::getCurrent()->makeOperationContext(); - deleteCoordinatorDocBlocking(opCtx.get(), lsid, txnNumber); + return _scheduler.scheduleWork([lsid, txnNumber](OperationContext* opCtx) { + deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber); }); } @@ -681,75 +528,109 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl } Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard( - const ShardId& shardId, const BSONObj& prepareCommandObj) { - return sendAsyncCommandToShard(_executor, _pool, shardId, prepareCommandObj) - .then([shardId](ResponseStatus response) { - return getCommitVoteFromPrepareResponse(shardId, response); - }) - .onError<ErrorCodes::ShardNotFound>([shardId](const Status& s) { - // ShardNotFound indicates a participant shard was removed, so that is interpreted as an - // abort decision - return Future<PrepareResponse>::makeReady( - {shardId, CommitDecision::kAbort, boost::none}); - }) - .onError( - [ this, shardId, prepareCommandObj = prepareCommandObj.getOwned() ](Status status) { - if (!isRetryableError(status.code())) - uassertStatusOK(status); + const ShardId& shardId, const BSONObj& commandObj) { + return txn::doWhile( + _scheduler, + kExponentialBackoff, + [](StatusWith<PrepareResponse> s) { return isRetryableError(s.getStatus().code()); }, + [ this, shardId, commandObj = commandObj.getOwned() ] { + return _scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) + .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) { + auto status = getStatusFromCommandResult(response.data); + auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); + + // There must be no writeConcern error in order for us to interpret the command + // response. + if (!wcStatus.isOK()) { + status = wcStatus; + } + + if (status.isOK()) { + auto prepareTimestampField = response.data["prepareTimestamp"]; + if (prepareTimestampField.eoo() || + prepareTimestampField.timestamp().isNull()) { + LOG(0) << "Coordinator shard received an OK response to " + "prepareTransaction " + "without a prepareTimestamp from shard " + << shardId << ", which is not expected behavior. " + "Interpreting the response from " + << shardId << " as a vote to abort"; + return PrepareResponse{shardId, PrepareVote::kAbort, boost::none}; + } + + LOG(3) << "Coordinator shard received a vote to commit from shard " + << shardId + << " with prepareTimestamp: " << prepareTimestampField.timestamp(); + return PrepareResponse{ + shardId, PrepareVote::kCommit, prepareTimestampField.timestamp()}; + } - if (_cancelled.loadRelaxed()) { - LOG(3) << "Prepare stopped retrying due to retrying being cancelled"; - return Future<PrepareResponse>::makeReady({shardId, boost::none, boost::none}); - } + LOG(3) << "Coordinator shard received " << status << " from shard " << shardId + << " for " << commandObj; + if (ErrorCodes::isVoteAbortError(status.code())) { + return PrepareResponse{shardId, PrepareVote::kAbort, boost::none}; + } - return sendPrepareToShard(shardId, prepareCommandObj); - }); + uassertStatusOK(status); + MONGO_UNREACHABLE; + }) + .onError<ErrorCodes::ShardNotFound>([shardId](const Status& s) { + // ShardNotFound may indicate that the participant shard has been removed (it + // could also mean the participant shard was recently added and this node + // refreshed its ShardRegistry from a stale config secondary). + // + // Since this node can't know which is the case, it is safe to pessimistically + // treat ShardNotFound as a vote to abort, which is always safe since the node + // must then send abort. + return Future<PrepareResponse>::makeReady( + {shardId, CommitDecision::kAbort, boost::none}); + }) + .onError([this, shardId](const Status& status) { + if (_cancelled.loadRelaxed()) { + LOG(3) << "Prepare stopped retrying due to retrying being cancelled"; + return PrepareResponse{shardId, boost::none, boost::none}; + } + uassertStatusOK(status); + MONGO_UNREACHABLE; + }); + }); } -// TODO (SERVER-37880): Implement backoff for retries and only retry commands on retryable -// errors. Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard( const ShardId& shardId, const BSONObj& commandObj) { - return sendAsyncCommandToShard(_executor, _pool, shardId, commandObj) - .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) { - auto status = getStatusFromCommandResult(response.data); - auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); - - // There must be no write concern errors in order for us to be able to interpret the - // command response - if (!wcStatus.isOK()) { - status = wcStatus; - } + return txn::doWhile( + _scheduler, + kExponentialBackoff, + [](const Status& s) { return isRetryableError(s.code()); }, + [ this, shardId, commandObj = commandObj.getOwned() ] { + return _scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) + .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) { + auto status = getStatusFromCommandResult(response.data); + auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); + + // There must be no writeConcern error in order for us to interpret the command + // response. + if (!wcStatus.isOK()) { + status = wcStatus; + } - if (status.isOK()) { - LOG(3) << "Coordinator shard received OK in response to " << commandObj - << "from shard " << shardId; - return Status::OK(); - } else if (ErrorCodes::isVoteAbortError(status.code())) { - LOG(3) << "Coordinator shard received a vote abort error in response to " - << commandObj << "from shard " << shardId << causedBy(status) - << ", interpreting as a successful ack"; + LOG(3) << "Coordinator shard received " << status << " in response to " + << commandObj << " from shard " << shardId; - return Status::OK(); - } else { - return status; - } - }) - .onError( - [ this, shardId, commandObj = commandObj.getOwned() ](Status status)->Future<void> { - if (isRetryableError(status.code())) { - LOG(3) << "Coordinator shard received a retryable error in response to " - << commandObj << "from shard " << shardId << causedBy(status) - << ", resending command."; - - return sendDecisionToParticipantShard(shardId, commandObj); - } else { - LOG(3) << "Coordinator shard received a non-retryable error in response to " - << commandObj << "from shard " << shardId << causedBy(status); + if (ErrorCodes::isVoteAbortError(status.code())) { + // Interpret voteAbort errors as an ack. + status = Status::OK(); + } return status; - } - }); + }) + .onError<ErrorCodes::ShardNotFound>([](const Status& s) { + // TODO (SERVER-38918): Unlike for prepare, there is no pessimistic way to + // handle ShardNotFound. It's not safe to treat ShardNotFound as an ack, because + // this node may have refreshed its ShardRegistry from a stale config secondary. + MONGO_UNREACHABLE; + }); + }); } void TransactionCoordinatorDriver::cancel() { diff --git a/src/mongo/db/transaction_coordinator_driver.h b/src/mongo/db/transaction_coordinator_driver.h index df08928efcd..a92d97f8174 100644 --- a/src/mongo/db/transaction_coordinator_driver.h +++ b/src/mongo/db/transaction_coordinator_driver.h @@ -35,11 +35,7 @@ #include "mongo/db/logical_session_id.h" #include "mongo/db/operation_context.h" #include "mongo/db/transaction_coordinator_document_gen.h" -#include "mongo/executor/task_executor.h" -#include "mongo/s/shard_id.h" -#include "mongo/util/concurrency/mutex.h" -#include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/future.h" +#include "mongo/db/transaction_coordinator_futures_util.h" namespace mongo { namespace txn { @@ -91,7 +87,7 @@ class TransactionCoordinatorDriver { MONGO_DISALLOW_COPYING(TransactionCoordinatorDriver); public: - TransactionCoordinatorDriver(executor::TaskExecutor* executor, ThreadPool* pool); + TransactionCoordinatorDriver(ServiceContext* service); ~TransactionCoordinatorDriver(); /** @@ -230,18 +226,7 @@ public: Future<void> sendDecisionToParticipantShard(const ShardId& shardId, const BSONObj& commandObj); private: - /** - * A task executor used to execute all network requests used to send messages to participants. - * The only current networking that may occur outside of this is when targeting a shard to find - * its host and port. - */ - executor::TaskExecutor* const _executor; - - /** - * A thread pool used to execute any code that should be non-blocking, e.g. persisting the - * participant list or the commit decision to disk. - */ - ThreadPool* const _pool; + txn::AsyncWorkScheduler _scheduler; // TODO (SERVER-38522): Remove once AsyncWorkScheduler is used for cancellation AtomicWord<bool> _cancelled{false}; diff --git a/src/mongo/db/transaction_coordinator_futures_util.h b/src/mongo/db/transaction_coordinator_futures_util.h index c64071d53a4..a15f715f948 100644 --- a/src/mongo/db/transaction_coordinator_futures_util.h +++ b/src/mongo/db/transaction_coordinator_futures_util.h @@ -36,7 +36,7 @@ #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/grid.h" -#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/concurrency/mutex.h" #include "mongo/util/future.h" #include "mongo/util/time_support.h" @@ -241,27 +241,6 @@ Future<GlobalResult> collect(std::vector<Future<IndividualResult>>&& futures, } /** - * A thin wrapper around ThreadPool::schedule that returns a future that will be resolved on the - * completion of the task or rejected if the task errors. - */ -template <class Callable> -Future<FutureContinuationResult<Callable>> async(ThreadPool* pool, Callable&& task) { - using ReturnType = decltype(task()); - auto pf = makePromiseFuture<ReturnType>(); - auto taskCompletionPromise = std::make_shared<Promise<ReturnType>>(std::move(pf.promise)); - auto scheduleStatus = pool->schedule( - [ task = std::forward<Callable>(task), taskCompletionPromise ]() mutable noexcept { - taskCompletionPromise->setWith(task); - }); - - if (!scheduleStatus.isOK()) { - taskCompletionPromise->setError(scheduleStatus); - } - - return std::move(pf.future); -} - -/** * Returns a future that will be resolved when all of the input futures have resolved, or rejected * when any of the futures is rejected. */ @@ -300,27 +279,5 @@ Future<FutureContinuationResult<LoopBodyFn>> doWhile(AsyncWorkScheduler& schedul }); } -/** - * Executes a function returning a Future until the function does not return an error status or - * until one of the provided error codes is returned. - * - * TODO (SERVER-37880): Implement backoff for retries. - */ -template <class Callable> -Future<FutureContinuationResult<Callable>> doUntilSuccessOrOneOf( - std::set<ErrorCodes::Error>&& errorsToHaltOn, Callable&& f) { - auto future = f(); - return std::move(future).onError( - [ errorsToHaltOn = std::move(errorsToHaltOn), - f = std::forward<Callable>(f) ](Status s) mutable { - // If this error is one of the errors we should halt on, rethrow the error and don't - // retry. - if (errorsToHaltOn.find(s.code()) != errorsToHaltOn.end()) { - uassertStatusOK(s); - } - return doUntilSuccessOrOneOf(std::move(errorsToHaltOn), std::forward<Callable>(f)); - }); -} - } // namespace txn } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp index 2cf14972c0b..3731a1853f5 100644 --- a/src/mongo/db/transaction_coordinator_service.cpp +++ b/src/mongo/db/transaction_coordinator_service.cpp @@ -51,42 +51,10 @@ namespace { const auto transactionCoordinatorServiceDecoration = ServiceContext::declareDecoration<TransactionCoordinatorService>(); -/** - * Constructs the default options for the thread pool used to run commit. - */ -ThreadPool::Options makeDefaultThreadPoolOptions() { - ThreadPool::Options options; - options.poolName = "TransactionCoordinatorService"; - options.minThreads = 0; - options.maxThreads = ThreadPool::Options::kUnlimited; - - // Ensure all threads have a client - options.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - }; - return options; -} - } // namespace TransactionCoordinatorService::TransactionCoordinatorService() - : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()), - _threadPool(std::make_unique<ThreadPool>(makeDefaultThreadPoolOptions())) {} - -void TransactionCoordinatorService::setThreadPoolForTest(std::unique_ptr<ThreadPool> pool) { - shutdown(); - _threadPool = std::move(pool); - startup(); -} - -void TransactionCoordinatorService::startup() { - _threadPool->startup(); -} - -void TransactionCoordinatorService::shutdown() { - _threadPool->shutdown(); - _threadPool->join(); -} + : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {} TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) { return get(opCtx->getServiceContext()); @@ -108,16 +76,16 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, latestCoordinator->cancelIfCommitNotYetStarted(); } - auto networkExecutor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - auto coordinator = std::make_shared<TransactionCoordinator>( - opCtx->getServiceContext(), networkExecutor, _threadPool.get(), lsid, txnNumber); + auto coordinator = + std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), lsid, txnNumber); _coordinatorCatalog->insert(opCtx, lsid, txnNumber, coordinator); // Schedule a task in the future to cancel the commit coordination on the coordinator, so that // the coordinator does not remain in memory forever (in case the particpant list is never // received). - auto cbHandle = uassertStatusOK(networkExecutor->scheduleWorkAt( + const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto cbHandle = uassertStatusOK(executor->scheduleWorkAt( commitDeadline, [coordinatorWeakPtr = std::weak_ptr<TransactionCoordinator>(coordinator)]( const mongo::executor::TaskExecutor::CallbackArgs& cbArgs) mutable { @@ -176,66 +144,70 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) { // stepup task. _coordinatorCatalog->enterStepUp(opCtx); - auto scheduleStatus = _threadPool->schedule([this]() { - try { - // The opCtx destructor handles unsetting itself from the Client - auto opCtxPtr = Client::getCurrent()->makeOperationContext(); - auto opCtx = opCtxPtr.get(); - - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); - const auto lastOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - LOG(3) << "Going to wait for client's last OpTime " << lastOpTime - << " to become majority committed"; - WriteConcernResult unusedWCResult; - uassertStatusOK(waitForWriteConcern( - opCtx, - lastOpTime, - WriteConcernOptions{WriteConcernOptions::kInternalMajorityNoSnapshot, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kNoTimeout}, - &unusedWCResult)); - - auto coordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx); - LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size() - << " transactions"; - - for (const auto& doc : coordinatorDocs) { - LOG(3) << "Going to resume coordinating commit for " << doc.toBSON(); - const auto lsid = *doc.getId().getSessionId(); - const auto txnNumber = *doc.getId().getTxnNumber(); - - auto networkExecutor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - auto coordinator = - std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), - networkExecutor, - _threadPool.get(), - lsid, - txnNumber); - _coordinatorCatalog->insert( - opCtx, lsid, txnNumber, coordinator, true /* forStepUp */); - coordinator->continueCommit(doc); - } - - _coordinatorCatalog->exitStepUp(); - - LOG(3) << "Incoming coordinateCommit requests now accepted"; - } catch (const DBException& e) { - LOG(3) << "Failed while executing thread to resume coordinating commit for pending " - "transactions " - << causedBy(e.toStatus()); - _coordinatorCatalog->exitStepUp(); - } - }); - - if (scheduleStatus.code() == ErrorCodes::ShutdownInProgress) { + const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto status = + executor + ->scheduleWork([ this, service = opCtx->getServiceContext() ]( + const executor::TaskExecutor::CallbackArgs&) { + try { + // The opCtx destructor handles unsetting itself from the Client + ThreadClient threadClient("TransactionCoordinator-StepUp", service); + auto opCtxPtr = Client::getCurrent()->makeOperationContext(); + auto opCtx = opCtxPtr.get(); + + auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient()); + replClientInfo.setLastOpToSystemLastOpTime(opCtx); + + const auto lastOpTime = replClientInfo.getLastOp(); + LOG(3) << "Going to wait for client's last OpTime " << lastOpTime + << " to become majority committed"; + WriteConcernResult unusedWCResult; + uassertStatusOK(waitForWriteConcern( + opCtx, + lastOpTime, + WriteConcernOptions{WriteConcernOptions::kInternalMajorityNoSnapshot, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout}, + &unusedWCResult)); + + auto coordinatorDocs = + TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx); + + LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size() + << " transactions"; + + for (const auto& doc : coordinatorDocs) { + LOG(3) << "Going to resume coordinating commit for " << doc.toBSON(); + const auto lsid = *doc.getId().getSessionId(); + const auto txnNumber = *doc.getId().getTxnNumber(); + + auto coordinator = std::make_shared<TransactionCoordinator>( + opCtx->getServiceContext(), lsid, txnNumber); + _coordinatorCatalog->insert( + opCtx, lsid, txnNumber, coordinator, true /* forStepUp */); + coordinator->continueCommit(doc); + } + + _coordinatorCatalog->exitStepUp(); + + LOG(3) << "Incoming coordinateCommit requests now accepted"; + } catch (const DBException& e) { + LOG(3) << "Failed while executing thread to resume coordinating commit for " + "pending " + "transactions " + << causedBy(e.toStatus()); + _coordinatorCatalog->exitStepUp(); + } + }) + .getStatus(); + + // TODO (SERVER-38320): Reschedule the stepup task if the interruption was not due to stepdown. + if (status == ErrorCodes::ShutdownInProgress || ErrorCodes::isInterruption(status.code())) { return; } - fassert(51031, scheduleStatus.isOK()); + invariant(status); } -ServiceContext::ConstructorActionRegisterer transactionCoordinatorServiceRegisterer{ - "TransactionCoordinatorService", - [](ServiceContext* service) { TransactionCoordinatorService::get(service)->startup(); }, - [](ServiceContext* service) { TransactionCoordinatorService::get(service)->shutdown(); }}; +void TransactionCoordinatorService::onStepDown(OperationContext* opCtx) {} } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h index a3649a46396..89971179010 100644 --- a/src/mongo/db/transaction_coordinator_service.h +++ b/src/mongo/db/transaction_coordinator_service.h @@ -35,7 +35,6 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/transaction_coordinator_catalog.h" -#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/future.h" namespace mongo { @@ -52,16 +51,6 @@ public: ~TransactionCoordinatorService() = default; /** - * Starts up the thread pool used for executing commits. - */ - void startup(); - - /** - * Shuts down and joins the thread pool used for executing commits. - */ - void shutdown(); - - /** * Retrieves the TransactionCoordinatorService associated with the service or operation context. */ static TransactionCoordinatorService* get(OperationContext* opCtx); @@ -111,17 +100,10 @@ public: * async task to continue coordinating its commit. */ void onStepUp(OperationContext* opCtx); - - /* - * Shuts down and joins the original thread pool, then sets the thread pool to 'pool' and starts - * 'pool'. - */ - void setThreadPoolForTest(std::unique_ptr<ThreadPool> pool); + void onStepDown(OperationContext* opCtx); private: std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog; - - std::unique_ptr<ThreadPool> _threadPool; }; } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp index eb0a0986fde..05aaa06b759 100644 --- a/src/mongo/db/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/transaction_coordinator_service_test.cpp @@ -31,36 +31,23 @@ #include "mongo/platform/basic.h" -#include "mongo/db/transaction_coordinator_service.h" - #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/server_parameters.h" +#include "mongo/db/transaction_coordinator_service.h" +#include "mongo/db/transaction_coordinator_test_fixture.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/catalog/sharding_catalog_client_mock.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/s/shard_id.h" -#include "mongo/s/shard_server_test_fixture.h" -#include "mongo/unittest/death_test.h" -#include "mongo/unittest/unittest.h" #include "mongo/util/log.h" namespace mongo { namespace { -const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}}; -const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}}; -const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}}; -const std::set<ShardId> kThreeShardIdSet{{"s1"}, {"s2"}, {"s3"}}; const Timestamp kDummyTimestamp = Timestamp::min(); const Date_t kCommitDeadline = Date_t::max(); const BSONObj kDummyWriteConcernError = BSON("code" << ErrorCodes::WriteConcernFailed << "errmsg" << "dummy"); -const StatusWith<BSONObj> kRetryableError = {ErrorCodes::HostUnreachable, ""}; const StatusWith<BSONObj> kNoSuchTransactionAndWriteConcernError = BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction << "writeConcernError" << kDummyWriteConcernError); @@ -76,81 +63,8 @@ const StatusWith<BSONObj> kPrepareOkButWriteConcernError = BSON("ok" << 1 << "prepareTimestamp" << Timestamp(1, 1) << "writeConcernError" << kDummyWriteConcernError); -HostAndPort makeHostAndPort(const ShardId& shardId) { - return HostAndPort(str::stream() << shardId << ":123"); -} - -/** - * Constructs the default options for the thread pool used to run commit. - */ -ThreadPool::Options makeSingleThreadedThreadPoolForTest() { - ThreadPool::Options options; - options.poolName = "TransactionCoordinatorService"; - options.minThreads = 0; - options.maxThreads = 1; - - // Ensure all threads have a client - options.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - }; - return options; -} - -class TransactionCoordinatorServiceTest : public ShardServerTestFixture { +class TransactionCoordinatorServiceTest : public TransactionCoordinatorTestFixture { public: - void setUp() override { - ShardServerTestFixture::setUp(); - - // Use a thread pool with a maxThreads=1 so that the unit tests can expect tasks to be run - // in a deterministic order. - TransactionCoordinatorService::get(operationContext()) - ->setThreadPoolForTest( - std::make_unique<ThreadPool>(makeSingleThreadedThreadPoolForTest())); - - ASSERT_OK(ServerParameterSet::getGlobal() - ->getMap() - .find("logComponentVerbosity") - ->second->setFromString("{verbosity: 3}")); - - for (const auto& shardId : kThreeShardIdList) { - auto shardTargeter = RemoteCommandTargeterMock::get( - uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId)) - ->getTargeter()); - shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); - } - } - - void tearDown() override { - // Join the original thread pool and replace it with a new one, because tasks in the pool - // may call getGlobalServiceContext, which is set to nullptr before the ServiceContext's - // destructor (which ordinarily joins the thread pool) is invoked. - TransactionCoordinatorService::get(operationContext()) - ->setThreadPoolForTest( - std::make_unique<ThreadPool>(makeSingleThreadedThreadPoolForTest())); - ShardServerTestFixture::tearDown(); - } - - void assertCommandSentAndRespondWith(const StringData& commandName, - const StatusWith<BSONObj>& response, - const BSONObj& expectedWriteConcern) { - log() << "Mock storage layer waiting for command " << commandName; - onCommand([&](const executor::RemoteCommandRequest& request) { - if (response.isOK()) { - log() << "Got command " << request.cmdObj.firstElement().fieldNameStringData() - << " and responding with " << response.getValue(); - } else { - log() << "Got command " << request.cmdObj.firstElement().fieldNameStringData() - << " and responding with " << response.getStatus(); - } - ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName); - - ASSERT_BSONOBJ_EQ( - expectedWriteConcern, - request.cmdObj.getObjectField(WriteConcernOptions::kWriteConcernField)); - return response; - }); - } - // Prepare responses void assertPrepareSentAndRespondWithSuccess() { @@ -163,6 +77,7 @@ public: assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kPrepareOkButWriteConcernError, WriteConcernOptions::InternalMajorityNoSnapshot); + advanceClockAndExecuteScheduledTasks(); } void assertPrepareSentAndRespondWithNoSuchTransaction() { @@ -175,6 +90,7 @@ public: assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kNoSuchTransactionAndWriteConcernError, WriteConcernOptions::InternalMajorityNoSnapshot); + advanceClockAndExecuteScheduledTasks(); } // Abort responses @@ -186,6 +102,7 @@ public: void assertAbortSentAndRespondWithSuccessAndWriteConcernError() { assertCommandSentAndRespondWith( "abortTransaction", kOkButWriteConcernError, WriteConcernOptions::Majority); + advanceClockAndExecuteScheduledTasks(); } void assertAbortSentAndRespondWithNoSuchTransaction() { @@ -197,6 +114,7 @@ public: assertCommandSentAndRespondWith("abortTransaction", kNoSuchTransactionAndWriteConcernError, WriteConcernOptions::Majority); + advanceClockAndExecuteScheduledTasks(); } // Commit responses @@ -210,11 +128,13 @@ public: assertCommandSentAndRespondWith(CommitTransaction::kCommandName, kOkButWriteConcernError, WriteConcernOptions::Majority); + advanceClockAndExecuteScheduledTasks(); } void assertCommitSentAndRespondWithRetryableError() { assertCommandSentAndRespondWith( CommitTransaction::kCommandName, kRetryableError, WriteConcernOptions::Majority); + advanceClockAndExecuteScheduledTasks(); } // Other @@ -274,35 +194,6 @@ public: commitDecisionFuture.get(); } - // Override the CatalogClient to make CatalogClient::getAllShards automatically return the - // expected shards. We cannot mock the network responses for the ShardRegistry reload, since the - // ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no - // DBClientMock analogous to the NetworkInterfaceMock. - std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( - std::unique_ptr<DistLockManager> distLockManager) override { - - class StaticCatalogClient final : public ShardingCatalogClientMock { - public: - StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {} - - StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( - OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { - std::vector<ShardType> shardTypes; - for (const auto& shardId : kThreeShardIdList) { - const ConnectionString cs = ConnectionString::forReplicaSet( - shardId.toString(), {makeHostAndPort(shardId)}); - ShardType sType; - sType.setName(cs.getSetName()); - sType.setHost(cs.toString()); - shardTypes.push_back(std::move(sType)); - }; - return repl::OpTimeWith<std::vector<ShardType>>(shardTypes); - } - }; - - return stdx::make_unique<StaticCatalogClient>(); - } - LogicalSessionId _lsid{makeLogicalSessionIdForTest()}; TxnNumber _txnNumber{1}; }; @@ -613,7 +504,7 @@ TEST_F(TransactionCoordinatorServiceTest, // Reach the deadline. network()->enterNetwork(); - network()->runUntil(deadline); + network()->advanceTime(deadline); network()->exitNetwork(); // The coordinator should no longer exist. @@ -635,7 +526,7 @@ TEST_F(TransactionCoordinatorServiceTest, // Reach the deadline. network()->enterNetwork(); - network()->runUntil(deadline); + network()->advanceTime(deadline); network()->exitNetwork(); // The coordinator should still exist. diff --git a/src/mongo/db/transaction_coordinator_test.cpp b/src/mongo/db/transaction_coordinator_test.cpp index 01d3ec53832..97bbf6624b0 100644 --- a/src/mongo/db/transaction_coordinator_test.cpp +++ b/src/mongo/db/transaction_coordinator_test.cpp @@ -35,31 +35,14 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/server_parameters.h" -#include "mongo/db/transaction_coordinator.h" -#include "mongo/executor/task_executor.h" -#include "mongo/executor/task_executor_pool.h" -#include "mongo/s/catalog/sharding_catalog_client_mock.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/s/shard_id.h" -#include "mongo/s/shard_server_test_fixture.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/db/transaction_coordinator_test_fixture.h" #include "mongo/util/log.h" namespace mongo { namespace { -using PrepareResponse = txn::PrepareResponse; - -const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}}; -const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}}; -const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}}; -const std::set<ShardId> kThreeShardIdSet{{"s1"}, {"s2"}, {"s3"}}; const Timestamp kDummyTimestamp = Timestamp::min(); const Date_t kCommitDeadline = Date_t::max(); -const Status kRetryableError(ErrorCodes::HostUnreachable, "Retryable error for test"); const StatusWith<BSONObj> kNoSuchTransaction = BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction); const StatusWith<BSONObj> kOk = BSON("ok" << 1); @@ -74,69 +57,8 @@ StatusWith<BSONObj> makePrepareOkResponse(const Timestamp& timestamp) { const StatusWith<BSONObj> kPrepareOk = makePrepareOkResponse(kDummyPrepareTimestamp); -HostAndPort makeHostAndPort(const ShardId& shardId) { - return HostAndPort(str::stream() << shardId << ":123"); -} - -/** - * Constructs the default options for the thread pool used to run commit. - */ -ThreadPool::Options makeDefaultThreadPoolOptions() { - ThreadPool::Options options; - options.poolName = "TransactionCoordinatorService"; - options.minThreads = 0; - options.maxThreads = 1; - - // Ensure all threads have a client - options.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - }; - return options; -} - -class TransactionCoordinatorTestBase : public ShardServerTestFixture { +class TransactionCoordinatorTestBase : public TransactionCoordinatorTestFixture { protected: - void setUp() override { - ShardServerTestFixture::setUp(); - - ASSERT_OK(ServerParameterSet::getGlobal() - ->getMap() - .find("logComponentVerbosity") - ->second->setFromString("{verbosity: 3}")); - - for (const auto& shardId : kThreeShardIdList) { - auto shardTargeter = RemoteCommandTargeterMock::get( - uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId)) - ->getTargeter()); - shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); - } - - _pool = std::make_unique<ThreadPool>(makeDefaultThreadPoolOptions()); - _pool->startup(); - _executor = Grid::get(getServiceContext())->getExecutorPool()->getFixedExecutor(); - } - - void tearDown() override { - _pool->shutdown(); - _pool.reset(); - - ShardServerTestFixture::tearDown(); - } - - void assertCommandSentAndRespondWith(const StringData& commandName, - const StatusWith<BSONObj>& response, - boost::optional<BSONObj> expectedWriteConcern) { - onCommand([&](const executor::RemoteCommandRequest& request) { - ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName); - if (expectedWriteConcern) { - ASSERT_BSONOBJ_EQ( - *expectedWriteConcern, - request.cmdObj.getObjectField(WriteConcernOptions::kWriteConcernField)); - } - return response; - }); - } - void assertPrepareSentAndRespondWithSuccess() { assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kPrepareOk, @@ -159,6 +81,7 @@ protected: assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kRetryableError, WriteConcernOptions::InternalMajorityNoSnapshot); + advanceClockAndExecuteScheduledTasks(); } void assertAbortSentAndRespondWithSuccess() { @@ -214,48 +137,15 @@ protected: // commitDecisionFuture.get(); } - // Override the CatalogClient to make CatalogClient::getAllShards automatically return the - // expected shards. We cannot mock the network responses for the ShardRegistry reload, since the - // ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no - // DBClientMock analogous to the NetworkInterfaceMock. - std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( - std::unique_ptr<DistLockManager> distLockManager) override { - - class StaticCatalogClient final : public ShardingCatalogClientMock { - public: - StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {} - - StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( - OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { - std::vector<ShardType> shardTypes; - for (const auto& shardId : kThreeShardIdList) { - const ConnectionString cs = ConnectionString::forReplicaSet( - shardId.toString(), {makeHostAndPort(shardId)}); - ShardType sType; - sType.setName(cs.getSetName()); - sType.setHost(cs.toString()); - shardTypes.push_back(std::move(sType)); - }; - return repl::OpTimeWith<std::vector<ShardType>>(shardTypes); - } - }; - - return stdx::make_unique<StaticCatalogClient>(); - } - - std::unique_ptr<ThreadPool> _pool; - executor::TaskExecutor* _executor; - LogicalSessionId _lsid{makeLogicalSessionIdForTest()}; TxnNumber _txnNumber{1}; }; - class TransactionCoordinatorDriverTest : public TransactionCoordinatorTestBase { protected: void setUp() override { TransactionCoordinatorTestBase::setUp(); - _driver.emplace(_executor, _pool.get()); + _driver.emplace(getServiceContext()); } void tearDown() override { @@ -282,20 +172,21 @@ TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOn Future<void> future = _driver->sendDecisionToParticipantShard( kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); }); - assertPrepareSentAndRespondWithSuccess(); + future.get(operationContext()); } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) { Future<void> future = _driver->sendDecisionToParticipantShard( kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - - std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); }); + ASSERT(!future.isReady()); assertPrepareSentAndRespondWithRetryableError(); + ASSERT(!future.isReady()); + assertPrepareSentAndRespondWithSuccess(); + future.get(operationContext()); } TEST_F(TransactionCoordinatorDriverTest, @@ -772,8 +663,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, using TransactionCoordinatorTest = TransactionCoordinatorTestBase; TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) { - TransactionCoordinator coordinator( - getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber); + TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); // Simulate a participant voting to abort. @@ -790,8 +680,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) } TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommit) { - TransactionCoordinator coordinator( - getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber); + TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(); @@ -808,8 +697,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommit TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesOneParticipantAborts) { - TransactionCoordinator coordinator( - getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber); + TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); // One participant votes abort after retry. @@ -830,8 +718,7 @@ TEST_F(TransactionCoordinatorTest, TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesBothParticipantsAbort) { - TransactionCoordinator coordinator( - getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber); + TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); // One participant votes abort after retry. @@ -852,8 +739,7 @@ TEST_F(TransactionCoordinatorTest, TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommitAfterNetworkRetries) { - TransactionCoordinator coordinator( - getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber); + TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); // One participant votes commit after retry. diff --git a/src/mongo/db/transaction_coordinator_test_fixture.cpp b/src/mongo/db/transaction_coordinator_test_fixture.cpp new file mode 100644 index 00000000000..b150a8a36a0 --- /dev/null +++ b/src/mongo/db/transaction_coordinator_test_fixture.cpp @@ -0,0 +1,120 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/transaction_coordinator_test_fixture.h" + +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/commands/txn_cmds_gen.h" +#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/server_parameters.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +HostAndPort makeHostAndPort(const ShardId& shardId) { + return HostAndPort(str::stream() << shardId << ":123"); +} + +} // namespace + +void TransactionCoordinatorTestFixture::setUp() { + ShardServerTestFixture::setUp(); + + ASSERT_OK(ServerParameterSet::getGlobal() + ->getMap() + .find("logComponentVerbosity") + ->second->setFromString("{verbosity: 3}")); + + for (const auto& shardId : kThreeShardIdList) { + auto shardTargeter = RemoteCommandTargeterMock::get( + uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))->getTargeter()); + shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); + } +} + +std::unique_ptr<ShardingCatalogClient> TransactionCoordinatorTestFixture::makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) { + + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient(std::vector<ShardId> shardIds) + : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {} + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + std::vector<ShardType> shardTypes; + for (const auto& shardId : _shardIds) { + const ConnectionString cs = + ConnectionString::forReplicaSet(shardId.toString(), {makeHostAndPort(shardId)}); + ShardType sType; + sType.setName(cs.getSetName()); + sType.setHost(cs.toString()); + shardTypes.push_back(std::move(sType)); + }; + return repl::OpTimeWith<std::vector<ShardType>>(shardTypes); + } + + private: + const std::vector<ShardId> _shardIds; + }; + + return stdx::make_unique<StaticCatalogClient>(kThreeShardIdList); +} + +void TransactionCoordinatorTestFixture::assertCommandSentAndRespondWith( + const StringData& commandName, + const StatusWith<BSONObj>& response, + boost::optional<BSONObj> expectedWriteConcern) { + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName); + if (expectedWriteConcern) { + ASSERT_BSONOBJ_EQ( + *expectedWriteConcern, + request.cmdObj.getObjectField(WriteConcernOptions::kWriteConcernField)); + } + return response; + }); +} + +void TransactionCoordinatorTestFixture::advanceClockAndExecuteScheduledTasks() { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->advanceTime(network()->now() + Seconds{1}); +} + +} // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_test_fixture.h b/src/mongo/db/transaction_coordinator_test_fixture.h new file mode 100644 index 00000000000..7466ad5acc5 --- /dev/null +++ b/src/mongo/db/transaction_coordinator_test_fixture.h @@ -0,0 +1,80 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <set> +#include <vector> + +#include "mongo/base/status.h" +#include "mongo/db/transaction_coordinator.h" +#include "mongo/s/shard_id.h" +#include "mongo/s/shard_server_test_fixture.h" + +namespace mongo { + +/** + * Implements common functionality shared across the various transaction coordinator unit-tests. + */ +class TransactionCoordinatorTestFixture : public ShardServerTestFixture { +protected: + using PrepareResponse = txn::PrepareResponse; + + void setUp() override; + + /** + * Override the CatalogClient to make CatalogClient::getAllShards automatically return the + * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the + * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no + * DBClientMock analogous to the NetworkInterfaceMock. + */ + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override; + + void assertCommandSentAndRespondWith(const StringData& commandName, + const StatusWith<BSONObj>& response, + boost::optional<BSONObj> expectedWriteConcern); + /** + * These tests use the network task executor mock, which doesn't automatically execute tasks, + * which are scheduled with delay. This helper function advances the clock by 1 second (which is + * the maximum back-off in the transaction coordinator) and causes any retries to run. + */ + void advanceClockAndExecuteScheduledTasks(); + + const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}}; + const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}}; + const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}}; + const std::set<ShardId> kThreeShardIdSet{{"s1"}, {"s2"}, {"s3"}}; + + const Status kRetryableError{ErrorCodes::HostUnreachable, + "Retryable error for coordinator test"}; +}; + +} // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index ed9d67ea536..f700f6c211e 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -420,6 +420,16 @@ Date_t NetworkInterfaceMock::runUntil(Date_t until) { return _now_inlock(); } +void NetworkInterfaceMock::advanceTime(Date_t newTime) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kNetworkThread); + invariant(newTime > _now_inlock()); + _now = newTime; + + _waitingToRunMask |= kExecutorThread; + _runReadyNetworkOperations_inlock(&lk); +} + void NetworkInterfaceMock::runReadyNetworkOperations() { stdx::unique_lock<stdx::mutex> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 621a44a02c5..937bed41525 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -253,6 +253,11 @@ public: Date_t runUntil(Date_t until); /** + * Runs the simulator forward until now() == until. + */ + void advanceTime(Date_t newTime); + + /** * Processes all ready, scheduled network operations. * * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. |