summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-09 21:22:46 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-16 09:10:44 -0500
commitcc2fba8d8dfae009369ac9084375c0fc513793d4 (patch)
tree3b02dd8d82f7a19165d5879e0c9caaa35145e9a7
parent75f8c26ace392f77aa35b87faeb2933f8ce6e6ad (diff)
downloadmongo-cc2fba8d8dfae009369ac9084375c0fc513793d4.tar.gz
SERVER-37880 Introduce backoff for retrying commit and abort messages
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp9
-rw-r--r--src/mongo/db/transaction_coordinator.cpp10
-rw-r--r--src/mongo/db/transaction_coordinator.h6
-rw-r--r--src/mongo/db/transaction_coordinator_catalog_test.cpp11
-rw-r--r--src/mongo/db/transaction_coordinator_driver.cpp343
-rw-r--r--src/mongo/db/transaction_coordinator_driver.h21
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util.h45
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp160
-rw-r--r--src/mongo/db/transaction_coordinator_service.h20
-rw-r--r--src/mongo/db/transaction_coordinator_service_test.cpp131
-rw-r--r--src/mongo/db/transaction_coordinator_test.cpp142
-rw-r--r--src/mongo/db/transaction_coordinator_test_fixture.cpp120
-rw-r--r--src/mongo/db/transaction_coordinator_test_fixture.h80
-rw-r--r--src/mongo/executor/network_interface_mock.cpp10
-rw-r--r--src/mongo/executor/network_interface_mock.h5
16 files changed, 438 insertions, 676 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 5e2b6201f06..b5f175b806a 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -727,6 +727,7 @@ env.CppUnitTest(
'transaction_coordinator_catalog_test.cpp',
'transaction_coordinator_futures_util_test.cpp',
'transaction_coordinator_service_test.cpp',
+ 'transaction_coordinator_test_fixture.cpp',
'transaction_coordinator_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp b/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp
index 35f90c58c51..226cb6d8306 100644
--- a/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp
+++ b/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp
@@ -73,7 +73,6 @@ void recoverDecisionFromLocalParticipantOrAbortLocalParticipant(OperationContext
// The local participant's txnNumber matched the request's txnNumber, and the txnNumber
// corresponds to a transaction (not a retryable write).
-
if (txnParticipant->transactionIsCommitted()) {
return;
}
@@ -82,8 +81,12 @@ void recoverDecisionFromLocalParticipantOrAbortLocalParticipant(OperationContext
} catch (const DBException& e) {
// Convert a PreparedTransactionInProgress error to an anonymous error code.
uassert(51021,
- "coordinateCommitTransaction command found local participant is prepared but no "
- "active coordinator exists",
+ str::stream() << "coordinateCommitTransaction found local participant for "
+ << opCtx->getLogicalSessionId()->getId()
+ << ":"
+ << *opCtx->getTxnNumber()
+ << " is prepared but no "
+ "active coordinator exists",
e.code() != ErrorCodes::PreparedTransactionInProgress);
throw;
}
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp
index 7eab0ada758..463cdb879a8 100644
--- a/src/mongo/db/transaction_coordinator.cpp
+++ b/src/mongo/db/transaction_coordinator.cpp
@@ -69,13 +69,11 @@ CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(
} // namespace
-TransactionCoordinator::TransactionCoordinator(ServiceContext* service,
- executor::TaskExecutor* networkExecutor,
- ThreadPool* callbackPool,
+TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
const LogicalSessionId& lsid,
const TxnNumber& txnNumber)
- : _service(service),
- _driver(networkExecutor, callbackPool),
+ : _serviceContext(serviceContext),
+ _driver(serviceContext),
_lsid(lsid),
_txnNumber(txnNumber),
_state(CoordinatorState::kInit) {}
@@ -120,7 +118,7 @@ Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne(
invariant(_state == CoordinatorState::kPreparing);
auto decision =
- makeDecisionFromPrepareVoteConsensus(_service, result, _lsid, _txnNumber);
+ makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber);
return _driver
.persistDecision(_lsid, _txnNumber, participantShards, decision.commitTimestamp)
diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h
index 6ab42e4d76b..063e32e209a 100644
--- a/src/mongo/db/transaction_coordinator.h
+++ b/src/mongo/db/transaction_coordinator.h
@@ -44,9 +44,7 @@ class TransactionCoordinator {
MONGO_DISALLOW_COPYING(TransactionCoordinator);
public:
- TransactionCoordinator(ServiceContext* service,
- executor::TaskExecutor* networkExecutor,
- ThreadPool* callbackPool,
+ TransactionCoordinator(ServiceContext* serviceContext,
const LogicalSessionId& lsid,
const TxnNumber& txnNumber);
~TransactionCoordinator();
@@ -166,7 +164,7 @@ private:
void _transitionToDone(stdx::unique_lock<stdx::mutex> lk) noexcept;
// Shortcut to the service context under which this coordinator runs
- ServiceContext* const _service;
+ ServiceContext* const _serviceContext;
// Context object used to perform and track the state of asynchronous operations on behalf of
// this coordinator.
diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp
index 9590de998e6..47303b590f6 100644
--- a/src/mongo/db/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp
@@ -31,10 +31,6 @@
#include "mongo/platform/basic.h"
#include "mongo/db/transaction_coordinator_catalog.h"
-
-#include <boost/optional/optional_io.hpp>
-
-#include "mongo/db/transaction_coordinator.h"
#include "mongo/s/shard_server_test_fixture.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
@@ -73,11 +69,8 @@ public:
void createCoordinatorInCatalog(OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber) {
- auto newCoordinator = std::make_shared<TransactionCoordinator>(getServiceContext(),
- nullptr /* TaskExecutor */,
- nullptr /* ThreadPool */,
- lsid,
- txnNumber);
+ auto newCoordinator =
+ std::make_shared<TransactionCoordinator>(getServiceContext(), lsid, txnNumber);
coordinatorCatalog().insert(opCtx, lsid, txnNumber, newCoordinator);
_coordinatorsForTest.push_back(newCoordinator);
diff --git a/src/mongo/db/transaction_coordinator_driver.cpp b/src/mongo/db/transaction_coordinator_driver.cpp
index 3a77c622a02..00bcc9015d8 100644
--- a/src/mongo/db/transaction_coordinator_driver.cpp
+++ b/src/mongo/db/transaction_coordinator_driver.cpp
@@ -35,7 +35,6 @@
#include "mongo/db/transaction_coordinator_driver.h"
#include "mongo/client/remote_command_retry_scheduler.h"
-#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/dbdirectclient.h"
@@ -44,7 +43,6 @@
#include "mongo/db/transaction_coordinator_futures_util.h"
#include "mongo/db/write_concern.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/grid.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -70,103 +68,9 @@ const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern(
WriteConcernOptions::SyncMode::UNSET,
WriteConcernOptions::kNoTimeout);
-/**
- * Finds the host and port for a shard.
- */
-HostAndPort targetHost(const ShardId& shardId, const ReadPreferenceSetting& readPref) {
- const auto opCtxHolder = cc().makeOperationContext();
- const auto opCtx = opCtxHolder.get();
- auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- // TODO SERVER-35678 return a SemiFuture<HostAndPort> rather than using a blocking call to
- // get().
- return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20)).get(opCtx);
-}
+const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
-/**
- * Finds the host and port for a shard.
- *
- * Possible errors: [ShardNotFound, ShutdownInProgress]
- *
- * TODO (SERVER-37880): Implement backoff for retries.
- */
-Future<HostAndPort> targetHostAsync(ThreadPool* pool,
- const ShardId& shardId,
- const ReadPreferenceSetting& readPref) {
- return txn::doUntilSuccessOrOneOf({ErrorCodes::ShardNotFound, ErrorCodes::ShutdownInProgress},
- [pool, shardId, readPref]() {
- return txn::async(pool, [shardId, readPref]() {
- return targetHost(shardId, readPref);
- });
- });
-}
-
-/**
- * Sends a command asynchronously to a shard using the given executor. The thread pool is used for
- * targeting the shard.
- *
- * If the command is sent successfully and a response is received, the resulting Future will contain
- * the ResponseStatus from the response.
- *
- * An error status is set on the resulting Future if any of the following occurs:
- * - If the targeting the shard fails [Possible errors: ShardNotFound, ShutdownInProgress]
- * - If the command cannot be scheduled to run on the executor [Possible errors:
- * ShutdownInProgress, maybe others]
- * - If a response is received that indicates that the destination shard was did not receive the
- * command or could not process the command (e.g. because the command did not reach the shard due
- * to a network issue) [Possible errors: Whatever the returned status is in the response]
- */
-Future<ResponseStatus> sendAsyncCommandToShard(executor::TaskExecutor* executor,
- ThreadPool* pool,
- const ShardId& shardId,
- const BSONObj& commandObj) {
-
- auto promiseAndFuture = makePromiseFuture<ResponseStatus>();
- auto sharedPromise =
- std::make_shared<Promise<ResponseStatus>>(std::move(promiseAndFuture.promise));
-
- auto readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly);
- targetHostAsync(pool, shardId, readPref)
- .then([ executor, shardId, sharedPromise, commandObj = commandObj.getOwned(), readPref ](
- HostAndPort shardHostAndPort) mutable {
-
- LOG(3) << "Coordinator going to send command " << commandObj << " to shard " << shardId;
-
- executor::RemoteCommandRequest request(
- shardHostAndPort, "admin", commandObj, readPref.toContainingBSON(), nullptr);
-
- auto swCallbackHandle = executor->scheduleRemoteCommand(
- request, [ commandObj = commandObj.getOwned(),
- shardId,
- sharedPromise ](const RemoteCommandCallbackArgs& args) mutable {
- LOG(3) << "Coordinator shard got response " << args.response.data << " for "
- << commandObj << " to " << shardId;
- auto status = args.response.status;
- // Only consider actual failures to send the command as errors.
- if (status.isOK()) {
- sharedPromise->emplaceValue(args.response);
- } else {
- sharedPromise->setError(status);
- }
- });
-
- if (!swCallbackHandle.isOK()) {
- LOG(3) << "Coordinator shard failed to schedule the task to send " << commandObj
- << " to shard " << shardId << causedBy(swCallbackHandle.getStatus());
- sharedPromise->setError(swCallbackHandle.getStatus());
- }
- })
- .onError([ shardId, commandObj = commandObj.getOwned(), sharedPromise ](Status s) {
- LOG(3) << "Coordinator shard failed to target command " << commandObj << " to shard "
- << shardId << causedBy(s);
-
- sharedPromise->setError(s);
- })
- .getAsync([](Status) {});
-
- // Do not wait for the callback to run. The callback will reschedule the remote request on
- // the same executor if necessary.
- return std::move(promiseAndFuture.future);
-}
+const ReadPreferenceSetting kPrimaryReadPreference{ReadPreference::PrimaryOnly};
bool isRetryableError(ErrorCodes::Error code) {
// TODO (SERVER-37880): Consider using RemoteCommandRetryScheduler.
@@ -192,61 +96,6 @@ BSONArray buildParticipantListMatchesConditions(const std::vector<ShardId>& part
return BSON_ARRAY(participantListContains << participantListHasSize);
}
-/**
- * Processes a response from a participant to the prepareTransaction command. An OK response is
- * interpreted as a vote to commit, and the prepareTimestamp is extracted from the response and
- * returned. A response that indicates a vote to abort is interpreted as such, and a retryable error
- * will re-throw so that we can retry the prepare command.
- */
-PrepareResponse getCommitVoteFromPrepareResponse(const ShardId& shardId,
- const ResponseStatus& response) {
- uassertStatusOK(getWriteConcernStatusFromCommandResult(response.data));
- auto status = getStatusFromCommandResult(response.data);
- auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
-
- // There must be no write concern errors in order for us to be able to interpret the command
- // response. As an example, it's possible for us to get NoSuchTransaction from a non-primary
- // node of a shard, in which case we might also receive a write concern error but it can't be
- // interpreted as a vote to abort.
- if (!wcStatus.isOK()) {
- status = wcStatus;
- }
-
- if (status.isOK()) {
- auto prepareTimestampField = response.data["prepareTimestamp"];
- if (prepareTimestampField.eoo() || prepareTimestampField.timestamp().isNull()) {
- LOG(0) << "Coordinator shard received an OK response to prepareTransaction "
- "without a prepareTimestamp from shard "
- << shardId << ", which is not expected behavior. Interpreting the response from "
- << shardId << " as a vote to abort";
- return PrepareResponse{shardId, PrepareVote::kAbort, boost::none};
- }
-
- LOG(3) << "Coordinator shard received a vote to commit from shard " << shardId
- << " with prepareTimestamp: " << prepareTimestampField.timestamp();
-
- return PrepareResponse{shardId, PrepareVote::kCommit, prepareTimestampField.timestamp()};
- } else if (ErrorCodes::isVoteAbortError(status.code())) {
- LOG(3) << "Coordinator shard received a vote to abort from shard " << shardId;
- return PrepareResponse{shardId, PrepareVote::kAbort, boost::none};
- } else if (isRetryableError(status.code())) {
- LOG(3) << "Coordinator shard received a retryable error in response to prepareTransaction "
- "from shard "
- << shardId << causedBy(status);
-
- // Rethrow error so that we retry.
- uassertStatusOK(status);
- } else {
- // Non-retryable errors lead to an abort decision.
- LOG(3)
- << "Coordinator shard received a non-retryable error in response to prepareTransaction "
- "from shard "
- << shardId << causedBy(status);
- return PrepareResponse{shardId, PrepareVote::kAbort, boost::none};
- }
- MONGO_UNREACHABLE;
-}
-
std::string buildParticipantListString(const std::vector<ShardId>& participantList) {
StringBuilder ss;
ss << "[";
@@ -259,9 +108,8 @@ std::string buildParticipantListString(const std::vector<ShardId>& participantLi
} // namespace
-TransactionCoordinatorDriver::TransactionCoordinatorDriver(executor::TaskExecutor* executor,
- ThreadPool* pool)
- : _executor(executor), _pool(pool) {}
+TransactionCoordinatorDriver::TransactionCoordinatorDriver(ServiceContext* service)
+ : _scheduler(service) {}
TransactionCoordinatorDriver::~TransactionCoordinatorDriver() = default;
@@ -358,9 +206,8 @@ void persistParticipantListBlocking(OperationContext* opCtx,
Future<void> TransactionCoordinatorDriver::persistParticipantList(
const LogicalSessionId& lsid, TxnNumber txnNumber, std::vector<ShardId> participantList) {
- return txn::async(_pool, [lsid, txnNumber, participantList] {
- auto opCtx = Client::getCurrent()->makeOperationContext();
- persistParticipantListBlocking(opCtx.get(), lsid, txnNumber, participantList);
+ return _scheduler.scheduleWork([lsid, txnNumber, participantList](OperationContext* opCtx) {
+ persistParticipantListBlocking(opCtx, lsid, txnNumber, participantList);
});
}
@@ -400,18 +247,19 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
switch (*next.vote) {
case PrepareVote::kAbort:
- LOG(3) << "Transaction coordinator received a vote to abort from shard "
- << next.participantShardId;
+ if (result.decision == CommitDecision::kAbort) {
+ LOG(3) << "Ignoring vote to abort from shard " << next.participantShardId
+ << " because a vote to abort has already been received";
+ break;
+ }
result.decision = CommitDecision::kAbort;
result.maxPrepareTimestamp = boost::none;
cancel();
break;
case PrepareVote::kCommit:
- LOG(3) << "Transaction coordinator received a vote to commit from shard "
- << next.participantShardId;
if (result.decision == CommitDecision::kAbort) {
- LOG(3) << "Ignoring commmit decision from shard " << next.participantShardId
- << " because abort decision was received previously";
+ LOG(3) << "Ignoring vote to commit from shard " << next.participantShardId
+ << " because a vote to abort has already been received";
break;
}
@@ -544,10 +392,10 @@ Future<void> TransactionCoordinatorDriver::persistDecision(
TxnNumber txnNumber,
std::vector<ShardId> participantList,
const boost::optional<Timestamp>& commitTimestamp) {
- return txn::async(_pool, [lsid, txnNumber, participantList, commitTimestamp] {
- auto opCtx = Client::getCurrent()->makeOperationContext();
- persistDecisionBlocking(opCtx.get(), lsid, txnNumber, participantList, commitTimestamp);
- });
+ return _scheduler.scheduleWork(
+ [lsid, txnNumber, participantList, commitTimestamp](OperationContext* opCtx) {
+ persistDecisionBlocking(opCtx, lsid, txnNumber, participantList, commitTimestamp);
+ });
}
Future<void> TransactionCoordinatorDriver::sendCommit(const std::vector<ShardId>& participantShards,
@@ -654,9 +502,8 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSessionId& lsid,
TxnNumber txnNumber) {
- return txn::async(_pool, [lsid, txnNumber] {
- auto opCtx = Client::getCurrent()->makeOperationContext();
- deleteCoordinatorDocBlocking(opCtx.get(), lsid, txnNumber);
+ return _scheduler.scheduleWork([lsid, txnNumber](OperationContext* opCtx) {
+ deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber);
});
}
@@ -681,75 +528,109 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl
}
Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
- const ShardId& shardId, const BSONObj& prepareCommandObj) {
- return sendAsyncCommandToShard(_executor, _pool, shardId, prepareCommandObj)
- .then([shardId](ResponseStatus response) {
- return getCommitVoteFromPrepareResponse(shardId, response);
- })
- .onError<ErrorCodes::ShardNotFound>([shardId](const Status& s) {
- // ShardNotFound indicates a participant shard was removed, so that is interpreted as an
- // abort decision
- return Future<PrepareResponse>::makeReady(
- {shardId, CommitDecision::kAbort, boost::none});
- })
- .onError(
- [ this, shardId, prepareCommandObj = prepareCommandObj.getOwned() ](Status status) {
- if (!isRetryableError(status.code()))
- uassertStatusOK(status);
+ const ShardId& shardId, const BSONObj& commandObj) {
+ return txn::doWhile(
+ _scheduler,
+ kExponentialBackoff,
+ [](StatusWith<PrepareResponse> s) { return isRetryableError(s.getStatus().code()); },
+ [ this, shardId, commandObj = commandObj.getOwned() ] {
+ return _scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
+ .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) {
+ auto status = getStatusFromCommandResult(response.data);
+ auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
+
+ // There must be no writeConcern error in order for us to interpret the command
+ // response.
+ if (!wcStatus.isOK()) {
+ status = wcStatus;
+ }
+
+ if (status.isOK()) {
+ auto prepareTimestampField = response.data["prepareTimestamp"];
+ if (prepareTimestampField.eoo() ||
+ prepareTimestampField.timestamp().isNull()) {
+ LOG(0) << "Coordinator shard received an OK response to "
+ "prepareTransaction "
+ "without a prepareTimestamp from shard "
+ << shardId << ", which is not expected behavior. "
+ "Interpreting the response from "
+ << shardId << " as a vote to abort";
+ return PrepareResponse{shardId, PrepareVote::kAbort, boost::none};
+ }
+
+ LOG(3) << "Coordinator shard received a vote to commit from shard "
+ << shardId
+ << " with prepareTimestamp: " << prepareTimestampField.timestamp();
+ return PrepareResponse{
+ shardId, PrepareVote::kCommit, prepareTimestampField.timestamp()};
+ }
- if (_cancelled.loadRelaxed()) {
- LOG(3) << "Prepare stopped retrying due to retrying being cancelled";
- return Future<PrepareResponse>::makeReady({shardId, boost::none, boost::none});
- }
+ LOG(3) << "Coordinator shard received " << status << " from shard " << shardId
+ << " for " << commandObj;
+ if (ErrorCodes::isVoteAbortError(status.code())) {
+ return PrepareResponse{shardId, PrepareVote::kAbort, boost::none};
+ }
- return sendPrepareToShard(shardId, prepareCommandObj);
- });
+ uassertStatusOK(status);
+ MONGO_UNREACHABLE;
+ })
+ .onError<ErrorCodes::ShardNotFound>([shardId](const Status& s) {
+ // ShardNotFound may indicate that the participant shard has been removed (it
+ // could also mean the participant shard was recently added and this node
+ // refreshed its ShardRegistry from a stale config secondary).
+ //
+ // Since this node can't know which is the case, it is safe to pessimistically
+ // treat ShardNotFound as a vote to abort, which is always safe since the node
+ // must then send abort.
+ return Future<PrepareResponse>::makeReady(
+ {shardId, CommitDecision::kAbort, boost::none});
+ })
+ .onError([this, shardId](const Status& status) {
+ if (_cancelled.loadRelaxed()) {
+ LOG(3) << "Prepare stopped retrying due to retrying being cancelled";
+ return PrepareResponse{shardId, boost::none, boost::none};
+ }
+ uassertStatusOK(status);
+ MONGO_UNREACHABLE;
+ });
+ });
}
-// TODO (SERVER-37880): Implement backoff for retries and only retry commands on retryable
-// errors.
Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard(
const ShardId& shardId, const BSONObj& commandObj) {
- return sendAsyncCommandToShard(_executor, _pool, shardId, commandObj)
- .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) {
- auto status = getStatusFromCommandResult(response.data);
- auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
-
- // There must be no write concern errors in order for us to be able to interpret the
- // command response
- if (!wcStatus.isOK()) {
- status = wcStatus;
- }
+ return txn::doWhile(
+ _scheduler,
+ kExponentialBackoff,
+ [](const Status& s) { return isRetryableError(s.code()); },
+ [ this, shardId, commandObj = commandObj.getOwned() ] {
+ return _scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
+ .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) {
+ auto status = getStatusFromCommandResult(response.data);
+ auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
+
+ // There must be no writeConcern error in order for us to interpret the command
+ // response.
+ if (!wcStatus.isOK()) {
+ status = wcStatus;
+ }
- if (status.isOK()) {
- LOG(3) << "Coordinator shard received OK in response to " << commandObj
- << "from shard " << shardId;
- return Status::OK();
- } else if (ErrorCodes::isVoteAbortError(status.code())) {
- LOG(3) << "Coordinator shard received a vote abort error in response to "
- << commandObj << "from shard " << shardId << causedBy(status)
- << ", interpreting as a successful ack";
+ LOG(3) << "Coordinator shard received " << status << " in response to "
+ << commandObj << " from shard " << shardId;
- return Status::OK();
- } else {
- return status;
- }
- })
- .onError(
- [ this, shardId, commandObj = commandObj.getOwned() ](Status status)->Future<void> {
- if (isRetryableError(status.code())) {
- LOG(3) << "Coordinator shard received a retryable error in response to "
- << commandObj << "from shard " << shardId << causedBy(status)
- << ", resending command.";
-
- return sendDecisionToParticipantShard(shardId, commandObj);
- } else {
- LOG(3) << "Coordinator shard received a non-retryable error in response to "
- << commandObj << "from shard " << shardId << causedBy(status);
+ if (ErrorCodes::isVoteAbortError(status.code())) {
+ // Interpret voteAbort errors as an ack.
+ status = Status::OK();
+ }
return status;
- }
- });
+ })
+ .onError<ErrorCodes::ShardNotFound>([](const Status& s) {
+ // TODO (SERVER-38918): Unlike for prepare, there is no pessimistic way to
+ // handle ShardNotFound. It's not safe to treat ShardNotFound as an ack, because
+ // this node may have refreshed its ShardRegistry from a stale config secondary.
+ MONGO_UNREACHABLE;
+ });
+ });
}
void TransactionCoordinatorDriver::cancel() {
diff --git a/src/mongo/db/transaction_coordinator_driver.h b/src/mongo/db/transaction_coordinator_driver.h
index df08928efcd..a92d97f8174 100644
--- a/src/mongo/db/transaction_coordinator_driver.h
+++ b/src/mongo/db/transaction_coordinator_driver.h
@@ -35,11 +35,7 @@
#include "mongo/db/logical_session_id.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/transaction_coordinator_document_gen.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/s/shard_id.h"
-#include "mongo/util/concurrency/mutex.h"
-#include "mongo/util/concurrency/thread_pool.h"
-#include "mongo/util/future.h"
+#include "mongo/db/transaction_coordinator_futures_util.h"
namespace mongo {
namespace txn {
@@ -91,7 +87,7 @@ class TransactionCoordinatorDriver {
MONGO_DISALLOW_COPYING(TransactionCoordinatorDriver);
public:
- TransactionCoordinatorDriver(executor::TaskExecutor* executor, ThreadPool* pool);
+ TransactionCoordinatorDriver(ServiceContext* service);
~TransactionCoordinatorDriver();
/**
@@ -230,18 +226,7 @@ public:
Future<void> sendDecisionToParticipantShard(const ShardId& shardId, const BSONObj& commandObj);
private:
- /**
- * A task executor used to execute all network requests used to send messages to participants.
- * The only current networking that may occur outside of this is when targeting a shard to find
- * its host and port.
- */
- executor::TaskExecutor* const _executor;
-
- /**
- * A thread pool used to execute any code that should be non-blocking, e.g. persisting the
- * participant list or the commit decision to disk.
- */
- ThreadPool* const _pool;
+ txn::AsyncWorkScheduler _scheduler;
// TODO (SERVER-38522): Remove once AsyncWorkScheduler is used for cancellation
AtomicWord<bool> _cancelled{false};
diff --git a/src/mongo/db/transaction_coordinator_futures_util.h b/src/mongo/db/transaction_coordinator_futures_util.h
index c64071d53a4..a15f715f948 100644
--- a/src/mongo/db/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/transaction_coordinator_futures_util.h
@@ -36,7 +36,7 @@
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/grid.h"
-#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/future.h"
#include "mongo/util/time_support.h"
@@ -241,27 +241,6 @@ Future<GlobalResult> collect(std::vector<Future<IndividualResult>>&& futures,
}
/**
- * A thin wrapper around ThreadPool::schedule that returns a future that will be resolved on the
- * completion of the task or rejected if the task errors.
- */
-template <class Callable>
-Future<FutureContinuationResult<Callable>> async(ThreadPool* pool, Callable&& task) {
- using ReturnType = decltype(task());
- auto pf = makePromiseFuture<ReturnType>();
- auto taskCompletionPromise = std::make_shared<Promise<ReturnType>>(std::move(pf.promise));
- auto scheduleStatus = pool->schedule(
- [ task = std::forward<Callable>(task), taskCompletionPromise ]() mutable noexcept {
- taskCompletionPromise->setWith(task);
- });
-
- if (!scheduleStatus.isOK()) {
- taskCompletionPromise->setError(scheduleStatus);
- }
-
- return std::move(pf.future);
-}
-
-/**
* Returns a future that will be resolved when all of the input futures have resolved, or rejected
* when any of the futures is rejected.
*/
@@ -300,27 +279,5 @@ Future<FutureContinuationResult<LoopBodyFn>> doWhile(AsyncWorkScheduler& schedul
});
}
-/**
- * Executes a function returning a Future until the function does not return an error status or
- * until one of the provided error codes is returned.
- *
- * TODO (SERVER-37880): Implement backoff for retries.
- */
-template <class Callable>
-Future<FutureContinuationResult<Callable>> doUntilSuccessOrOneOf(
- std::set<ErrorCodes::Error>&& errorsToHaltOn, Callable&& f) {
- auto future = f();
- return std::move(future).onError(
- [ errorsToHaltOn = std::move(errorsToHaltOn),
- f = std::forward<Callable>(f) ](Status s) mutable {
- // If this error is one of the errors we should halt on, rethrow the error and don't
- // retry.
- if (errorsToHaltOn.find(s.code()) != errorsToHaltOn.end()) {
- uassertStatusOK(s);
- }
- return doUntilSuccessOrOneOf(std::move(errorsToHaltOn), std::forward<Callable>(f));
- });
-}
-
} // namespace txn
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index 2cf14972c0b..3731a1853f5 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -51,42 +51,10 @@ namespace {
const auto transactionCoordinatorServiceDecoration =
ServiceContext::declareDecoration<TransactionCoordinatorService>();
-/**
- * Constructs the default options for the thread pool used to run commit.
- */
-ThreadPool::Options makeDefaultThreadPoolOptions() {
- ThreadPool::Options options;
- options.poolName = "TransactionCoordinatorService";
- options.minThreads = 0;
- options.maxThreads = ThreadPool::Options::kUnlimited;
-
- // Ensure all threads have a client
- options.onCreateThread = [](const std::string& threadName) {
- Client::initThread(threadName.c_str());
- };
- return options;
-}
-
} // namespace
TransactionCoordinatorService::TransactionCoordinatorService()
- : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()),
- _threadPool(std::make_unique<ThreadPool>(makeDefaultThreadPoolOptions())) {}
-
-void TransactionCoordinatorService::setThreadPoolForTest(std::unique_ptr<ThreadPool> pool) {
- shutdown();
- _threadPool = std::move(pool);
- startup();
-}
-
-void TransactionCoordinatorService::startup() {
- _threadPool->startup();
-}
-
-void TransactionCoordinatorService::shutdown() {
- _threadPool->shutdown();
- _threadPool->join();
-}
+ : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {}
TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
@@ -108,16 +76,16 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
latestCoordinator->cancelIfCommitNotYetStarted();
}
- auto networkExecutor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- auto coordinator = std::make_shared<TransactionCoordinator>(
- opCtx->getServiceContext(), networkExecutor, _threadPool.get(), lsid, txnNumber);
+ auto coordinator =
+ std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), lsid, txnNumber);
_coordinatorCatalog->insert(opCtx, lsid, txnNumber, coordinator);
// Schedule a task in the future to cancel the commit coordination on the coordinator, so that
// the coordinator does not remain in memory forever (in case the particpant list is never
// received).
- auto cbHandle = uassertStatusOK(networkExecutor->scheduleWorkAt(
+ const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ auto cbHandle = uassertStatusOK(executor->scheduleWorkAt(
commitDeadline,
[coordinatorWeakPtr = std::weak_ptr<TransactionCoordinator>(coordinator)](
const mongo::executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
@@ -176,66 +144,70 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) {
// stepup task.
_coordinatorCatalog->enterStepUp(opCtx);
- auto scheduleStatus = _threadPool->schedule([this]() {
- try {
- // The opCtx destructor handles unsetting itself from the Client
- auto opCtxPtr = Client::getCurrent()->makeOperationContext();
- auto opCtx = opCtxPtr.get();
-
- repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
- const auto lastOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- LOG(3) << "Going to wait for client's last OpTime " << lastOpTime
- << " to become majority committed";
- WriteConcernResult unusedWCResult;
- uassertStatusOK(waitForWriteConcern(
- opCtx,
- lastOpTime,
- WriteConcernOptions{WriteConcernOptions::kInternalMajorityNoSnapshot,
- WriteConcernOptions::SyncMode::UNSET,
- WriteConcernOptions::kNoTimeout},
- &unusedWCResult));
-
- auto coordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
- LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size()
- << " transactions";
-
- for (const auto& doc : coordinatorDocs) {
- LOG(3) << "Going to resume coordinating commit for " << doc.toBSON();
- const auto lsid = *doc.getId().getSessionId();
- const auto txnNumber = *doc.getId().getTxnNumber();
-
- auto networkExecutor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- auto coordinator =
- std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(),
- networkExecutor,
- _threadPool.get(),
- lsid,
- txnNumber);
- _coordinatorCatalog->insert(
- opCtx, lsid, txnNumber, coordinator, true /* forStepUp */);
- coordinator->continueCommit(doc);
- }
-
- _coordinatorCatalog->exitStepUp();
-
- LOG(3) << "Incoming coordinateCommit requests now accepted";
- } catch (const DBException& e) {
- LOG(3) << "Failed while executing thread to resume coordinating commit for pending "
- "transactions "
- << causedBy(e.toStatus());
- _coordinatorCatalog->exitStepUp();
- }
- });
-
- if (scheduleStatus.code() == ErrorCodes::ShutdownInProgress) {
+ const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ auto status =
+ executor
+ ->scheduleWork([ this, service = opCtx->getServiceContext() ](
+ const executor::TaskExecutor::CallbackArgs&) {
+ try {
+ // The opCtx destructor handles unsetting itself from the Client
+ ThreadClient threadClient("TransactionCoordinator-StepUp", service);
+ auto opCtxPtr = Client::getCurrent()->makeOperationContext();
+ auto opCtx = opCtxPtr.get();
+
+ auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient());
+ replClientInfo.setLastOpToSystemLastOpTime(opCtx);
+
+ const auto lastOpTime = replClientInfo.getLastOp();
+ LOG(3) << "Going to wait for client's last OpTime " << lastOpTime
+ << " to become majority committed";
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(waitForWriteConcern(
+ opCtx,
+ lastOpTime,
+ WriteConcernOptions{WriteConcernOptions::kInternalMajorityNoSnapshot,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout},
+ &unusedWCResult));
+
+ auto coordinatorDocs =
+ TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
+
+ LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size()
+ << " transactions";
+
+ for (const auto& doc : coordinatorDocs) {
+ LOG(3) << "Going to resume coordinating commit for " << doc.toBSON();
+ const auto lsid = *doc.getId().getSessionId();
+ const auto txnNumber = *doc.getId().getTxnNumber();
+
+ auto coordinator = std::make_shared<TransactionCoordinator>(
+ opCtx->getServiceContext(), lsid, txnNumber);
+ _coordinatorCatalog->insert(
+ opCtx, lsid, txnNumber, coordinator, true /* forStepUp */);
+ coordinator->continueCommit(doc);
+ }
+
+ _coordinatorCatalog->exitStepUp();
+
+ LOG(3) << "Incoming coordinateCommit requests now accepted";
+ } catch (const DBException& e) {
+ LOG(3) << "Failed while executing thread to resume coordinating commit for "
+ "pending "
+ "transactions "
+ << causedBy(e.toStatus());
+ _coordinatorCatalog->exitStepUp();
+ }
+ })
+ .getStatus();
+
+ // TODO (SERVER-38320): Reschedule the stepup task if the interruption was not due to stepdown.
+ if (status == ErrorCodes::ShutdownInProgress || ErrorCodes::isInterruption(status.code())) {
return;
}
- fassert(51031, scheduleStatus.isOK());
+ invariant(status);
}
-ServiceContext::ConstructorActionRegisterer transactionCoordinatorServiceRegisterer{
- "TransactionCoordinatorService",
- [](ServiceContext* service) { TransactionCoordinatorService::get(service)->startup(); },
- [](ServiceContext* service) { TransactionCoordinatorService::get(service)->shutdown(); }};
+void TransactionCoordinatorService::onStepDown(OperationContext* opCtx) {}
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h
index a3649a46396..89971179010 100644
--- a/src/mongo/db/transaction_coordinator_service.h
+++ b/src/mongo/db/transaction_coordinator_service.h
@@ -35,7 +35,6 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/transaction_coordinator_catalog.h"
-#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -52,16 +51,6 @@ public:
~TransactionCoordinatorService() = default;
/**
- * Starts up the thread pool used for executing commits.
- */
- void startup();
-
- /**
- * Shuts down and joins the thread pool used for executing commits.
- */
- void shutdown();
-
- /**
* Retrieves the TransactionCoordinatorService associated with the service or operation context.
*/
static TransactionCoordinatorService* get(OperationContext* opCtx);
@@ -111,17 +100,10 @@ public:
* async task to continue coordinating its commit.
*/
void onStepUp(OperationContext* opCtx);
-
- /*
- * Shuts down and joins the original thread pool, then sets the thread pool to 'pool' and starts
- * 'pool'.
- */
- void setThreadPoolForTest(std::unique_ptr<ThreadPool> pool);
+ void onStepDown(OperationContext* opCtx);
private:
std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog;
-
- std::unique_ptr<ThreadPool> _threadPool;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp
index eb0a0986fde..05aaa06b759 100644
--- a/src/mongo/db/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/transaction_coordinator_service_test.cpp
@@ -31,36 +31,23 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/transaction_coordinator_service.h"
-
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/server_parameters.h"
+#include "mongo/db/transaction_coordinator_service.h"
+#include "mongo/db/transaction_coordinator_test_fixture.h"
#include "mongo/db/write_concern_options.h"
-#include "mongo/s/catalog/sharding_catalog_client_mock.h"
-#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/shard_id.h"
-#include "mongo/s/shard_server_test_fixture.h"
-#include "mongo/unittest/death_test.h"
-#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
-const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}};
-const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}};
-const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}};
-const std::set<ShardId> kThreeShardIdSet{{"s1"}, {"s2"}, {"s3"}};
const Timestamp kDummyTimestamp = Timestamp::min();
const Date_t kCommitDeadline = Date_t::max();
const BSONObj kDummyWriteConcernError = BSON("code" << ErrorCodes::WriteConcernFailed << "errmsg"
<< "dummy");
-const StatusWith<BSONObj> kRetryableError = {ErrorCodes::HostUnreachable, ""};
const StatusWith<BSONObj> kNoSuchTransactionAndWriteConcernError =
BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction << "writeConcernError"
<< kDummyWriteConcernError);
@@ -76,81 +63,8 @@ const StatusWith<BSONObj> kPrepareOkButWriteConcernError =
BSON("ok" << 1 << "prepareTimestamp" << Timestamp(1, 1) << "writeConcernError"
<< kDummyWriteConcernError);
-HostAndPort makeHostAndPort(const ShardId& shardId) {
- return HostAndPort(str::stream() << shardId << ":123");
-}
-
-/**
- * Constructs the default options for the thread pool used to run commit.
- */
-ThreadPool::Options makeSingleThreadedThreadPoolForTest() {
- ThreadPool::Options options;
- options.poolName = "TransactionCoordinatorService";
- options.minThreads = 0;
- options.maxThreads = 1;
-
- // Ensure all threads have a client
- options.onCreateThread = [](const std::string& threadName) {
- Client::initThread(threadName.c_str());
- };
- return options;
-}
-
-class TransactionCoordinatorServiceTest : public ShardServerTestFixture {
+class TransactionCoordinatorServiceTest : public TransactionCoordinatorTestFixture {
public:
- void setUp() override {
- ShardServerTestFixture::setUp();
-
- // Use a thread pool with a maxThreads=1 so that the unit tests can expect tasks to be run
- // in a deterministic order.
- TransactionCoordinatorService::get(operationContext())
- ->setThreadPoolForTest(
- std::make_unique<ThreadPool>(makeSingleThreadedThreadPoolForTest()));
-
- ASSERT_OK(ServerParameterSet::getGlobal()
- ->getMap()
- .find("logComponentVerbosity")
- ->second->setFromString("{verbosity: 3}"));
-
- for (const auto& shardId : kThreeShardIdList) {
- auto shardTargeter = RemoteCommandTargeterMock::get(
- uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))
- ->getTargeter());
- shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
- }
- }
-
- void tearDown() override {
- // Join the original thread pool and replace it with a new one, because tasks in the pool
- // may call getGlobalServiceContext, which is set to nullptr before the ServiceContext's
- // destructor (which ordinarily joins the thread pool) is invoked.
- TransactionCoordinatorService::get(operationContext())
- ->setThreadPoolForTest(
- std::make_unique<ThreadPool>(makeSingleThreadedThreadPoolForTest()));
- ShardServerTestFixture::tearDown();
- }
-
- void assertCommandSentAndRespondWith(const StringData& commandName,
- const StatusWith<BSONObj>& response,
- const BSONObj& expectedWriteConcern) {
- log() << "Mock storage layer waiting for command " << commandName;
- onCommand([&](const executor::RemoteCommandRequest& request) {
- if (response.isOK()) {
- log() << "Got command " << request.cmdObj.firstElement().fieldNameStringData()
- << " and responding with " << response.getValue();
- } else {
- log() << "Got command " << request.cmdObj.firstElement().fieldNameStringData()
- << " and responding with " << response.getStatus();
- }
- ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName);
-
- ASSERT_BSONOBJ_EQ(
- expectedWriteConcern,
- request.cmdObj.getObjectField(WriteConcernOptions::kWriteConcernField));
- return response;
- });
- }
-
// Prepare responses
void assertPrepareSentAndRespondWithSuccess() {
@@ -163,6 +77,7 @@ public:
assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
kPrepareOkButWriteConcernError,
WriteConcernOptions::InternalMajorityNoSnapshot);
+ advanceClockAndExecuteScheduledTasks();
}
void assertPrepareSentAndRespondWithNoSuchTransaction() {
@@ -175,6 +90,7 @@ public:
assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
kNoSuchTransactionAndWriteConcernError,
WriteConcernOptions::InternalMajorityNoSnapshot);
+ advanceClockAndExecuteScheduledTasks();
}
// Abort responses
@@ -186,6 +102,7 @@ public:
void assertAbortSentAndRespondWithSuccessAndWriteConcernError() {
assertCommandSentAndRespondWith(
"abortTransaction", kOkButWriteConcernError, WriteConcernOptions::Majority);
+ advanceClockAndExecuteScheduledTasks();
}
void assertAbortSentAndRespondWithNoSuchTransaction() {
@@ -197,6 +114,7 @@ public:
assertCommandSentAndRespondWith("abortTransaction",
kNoSuchTransactionAndWriteConcernError,
WriteConcernOptions::Majority);
+ advanceClockAndExecuteScheduledTasks();
}
// Commit responses
@@ -210,11 +128,13 @@ public:
assertCommandSentAndRespondWith(CommitTransaction::kCommandName,
kOkButWriteConcernError,
WriteConcernOptions::Majority);
+ advanceClockAndExecuteScheduledTasks();
}
void assertCommitSentAndRespondWithRetryableError() {
assertCommandSentAndRespondWith(
CommitTransaction::kCommandName, kRetryableError, WriteConcernOptions::Majority);
+ advanceClockAndExecuteScheduledTasks();
}
// Other
@@ -274,35 +194,6 @@ public:
commitDecisionFuture.get();
}
- // Override the CatalogClient to make CatalogClient::getAllShards automatically return the
- // expected shards. We cannot mock the network responses for the ShardRegistry reload, since the
- // ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no
- // DBClientMock analogous to the NetworkInterfaceMock.
- std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
- std::unique_ptr<DistLockManager> distLockManager) override {
-
- class StaticCatalogClient final : public ShardingCatalogClientMock {
- public:
- StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {}
-
- StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
- OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
- std::vector<ShardType> shardTypes;
- for (const auto& shardId : kThreeShardIdList) {
- const ConnectionString cs = ConnectionString::forReplicaSet(
- shardId.toString(), {makeHostAndPort(shardId)});
- ShardType sType;
- sType.setName(cs.getSetName());
- sType.setHost(cs.toString());
- shardTypes.push_back(std::move(sType));
- };
- return repl::OpTimeWith<std::vector<ShardType>>(shardTypes);
- }
- };
-
- return stdx::make_unique<StaticCatalogClient>();
- }
-
LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
TxnNumber _txnNumber{1};
};
@@ -613,7 +504,7 @@ TEST_F(TransactionCoordinatorServiceTest,
// Reach the deadline.
network()->enterNetwork();
- network()->runUntil(deadline);
+ network()->advanceTime(deadline);
network()->exitNetwork();
// The coordinator should no longer exist.
@@ -635,7 +526,7 @@ TEST_F(TransactionCoordinatorServiceTest,
// Reach the deadline.
network()->enterNetwork();
- network()->runUntil(deadline);
+ network()->advanceTime(deadline);
network()->exitNetwork();
// The coordinator should still exist.
diff --git a/src/mongo/db/transaction_coordinator_test.cpp b/src/mongo/db/transaction_coordinator_test.cpp
index 01d3ec53832..97bbf6624b0 100644
--- a/src/mongo/db/transaction_coordinator_test.cpp
+++ b/src/mongo/db/transaction_coordinator_test.cpp
@@ -35,31 +35,14 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/server_parameters.h"
-#include "mongo/db/transaction_coordinator.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/executor/task_executor_pool.h"
-#include "mongo/s/catalog/sharding_catalog_client_mock.h"
-#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/shard_id.h"
-#include "mongo/s/shard_server_test_fixture.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/db/transaction_coordinator_test_fixture.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
-using PrepareResponse = txn::PrepareResponse;
-
-const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}};
-const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}};
-const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}};
-const std::set<ShardId> kThreeShardIdSet{{"s1"}, {"s2"}, {"s3"}};
const Timestamp kDummyTimestamp = Timestamp::min();
const Date_t kCommitDeadline = Date_t::max();
-const Status kRetryableError(ErrorCodes::HostUnreachable, "Retryable error for test");
const StatusWith<BSONObj> kNoSuchTransaction =
BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction);
const StatusWith<BSONObj> kOk = BSON("ok" << 1);
@@ -74,69 +57,8 @@ StatusWith<BSONObj> makePrepareOkResponse(const Timestamp& timestamp) {
const StatusWith<BSONObj> kPrepareOk = makePrepareOkResponse(kDummyPrepareTimestamp);
-HostAndPort makeHostAndPort(const ShardId& shardId) {
- return HostAndPort(str::stream() << shardId << ":123");
-}
-
-/**
- * Constructs the default options for the thread pool used to run commit.
- */
-ThreadPool::Options makeDefaultThreadPoolOptions() {
- ThreadPool::Options options;
- options.poolName = "TransactionCoordinatorService";
- options.minThreads = 0;
- options.maxThreads = 1;
-
- // Ensure all threads have a client
- options.onCreateThread = [](const std::string& threadName) {
- Client::initThread(threadName.c_str());
- };
- return options;
-}
-
-class TransactionCoordinatorTestBase : public ShardServerTestFixture {
+class TransactionCoordinatorTestBase : public TransactionCoordinatorTestFixture {
protected:
- void setUp() override {
- ShardServerTestFixture::setUp();
-
- ASSERT_OK(ServerParameterSet::getGlobal()
- ->getMap()
- .find("logComponentVerbosity")
- ->second->setFromString("{verbosity: 3}"));
-
- for (const auto& shardId : kThreeShardIdList) {
- auto shardTargeter = RemoteCommandTargeterMock::get(
- uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))
- ->getTargeter());
- shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
- }
-
- _pool = std::make_unique<ThreadPool>(makeDefaultThreadPoolOptions());
- _pool->startup();
- _executor = Grid::get(getServiceContext())->getExecutorPool()->getFixedExecutor();
- }
-
- void tearDown() override {
- _pool->shutdown();
- _pool.reset();
-
- ShardServerTestFixture::tearDown();
- }
-
- void assertCommandSentAndRespondWith(const StringData& commandName,
- const StatusWith<BSONObj>& response,
- boost::optional<BSONObj> expectedWriteConcern) {
- onCommand([&](const executor::RemoteCommandRequest& request) {
- ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName);
- if (expectedWriteConcern) {
- ASSERT_BSONOBJ_EQ(
- *expectedWriteConcern,
- request.cmdObj.getObjectField(WriteConcernOptions::kWriteConcernField));
- }
- return response;
- });
- }
-
void assertPrepareSentAndRespondWithSuccess() {
assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
kPrepareOk,
@@ -159,6 +81,7 @@ protected:
assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
kRetryableError,
WriteConcernOptions::InternalMajorityNoSnapshot);
+ advanceClockAndExecuteScheduledTasks();
}
void assertAbortSentAndRespondWithSuccess() {
@@ -214,48 +137,15 @@ protected:
// commitDecisionFuture.get();
}
- // Override the CatalogClient to make CatalogClient::getAllShards automatically return the
- // expected shards. We cannot mock the network responses for the ShardRegistry reload, since the
- // ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no
- // DBClientMock analogous to the NetworkInterfaceMock.
- std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
- std::unique_ptr<DistLockManager> distLockManager) override {
-
- class StaticCatalogClient final : public ShardingCatalogClientMock {
- public:
- StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {}
-
- StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
- OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
- std::vector<ShardType> shardTypes;
- for (const auto& shardId : kThreeShardIdList) {
- const ConnectionString cs = ConnectionString::forReplicaSet(
- shardId.toString(), {makeHostAndPort(shardId)});
- ShardType sType;
- sType.setName(cs.getSetName());
- sType.setHost(cs.toString());
- shardTypes.push_back(std::move(sType));
- };
- return repl::OpTimeWith<std::vector<ShardType>>(shardTypes);
- }
- };
-
- return stdx::make_unique<StaticCatalogClient>();
- }
-
- std::unique_ptr<ThreadPool> _pool;
- executor::TaskExecutor* _executor;
-
LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
TxnNumber _txnNumber{1};
};
-
class TransactionCoordinatorDriverTest : public TransactionCoordinatorTestBase {
protected:
void setUp() override {
TransactionCoordinatorTestBase::setUp();
- _driver.emplace(_executor, _pool.get());
+ _driver.emplace(getServiceContext());
}
void tearDown() override {
@@ -282,20 +172,21 @@ TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOn
Future<void> future = _driver->sendDecisionToParticipantShard(
kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
- std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); });
-
assertPrepareSentAndRespondWithSuccess();
+ future.get(operationContext());
}
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) {
Future<void> future = _driver->sendDecisionToParticipantShard(
kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
-
- std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); });
+ ASSERT(!future.isReady());
assertPrepareSentAndRespondWithRetryableError();
+ ASSERT(!future.isReady());
+
assertPrepareSentAndRespondWithSuccess();
+ future.get(operationContext());
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -772,8 +663,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
using TransactionCoordinatorTest = TransactionCoordinatorTestBase;
TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) {
- TransactionCoordinator coordinator(
- getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber);
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
// Simulate a participant voting to abort.
@@ -790,8 +680,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort)
}
TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommit) {
- TransactionCoordinator coordinator(
- getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber);
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess();
@@ -808,8 +697,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommit
TEST_F(TransactionCoordinatorTest,
RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesOneParticipantAborts) {
- TransactionCoordinator coordinator(
- getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber);
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
// One participant votes abort after retry.
@@ -830,8 +718,7 @@ TEST_F(TransactionCoordinatorTest,
TEST_F(TransactionCoordinatorTest,
RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesBothParticipantsAbort) {
- TransactionCoordinator coordinator(
- getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber);
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
// One participant votes abort after retry.
@@ -852,8 +739,7 @@ TEST_F(TransactionCoordinatorTest,
TEST_F(TransactionCoordinatorTest,
RunCommitReturnsCorrectCommitDecisionOnCommitAfterNetworkRetries) {
- TransactionCoordinator coordinator(
- getServiceContext(), _executor, _pool.get(), _lsid, _txnNumber);
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
// One participant votes commit after retry.
diff --git a/src/mongo/db/transaction_coordinator_test_fixture.cpp b/src/mongo/db/transaction_coordinator_test_fixture.cpp
new file mode 100644
index 00000000000..b150a8a36a0
--- /dev/null
+++ b/src/mongo/db/transaction_coordinator_test_fixture.cpp
@@ -0,0 +1,120 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/transaction_coordinator_test_fixture.h"
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/commands/txn_cmds_gen.h"
+#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/server_parameters.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+HostAndPort makeHostAndPort(const ShardId& shardId) {
+ return HostAndPort(str::stream() << shardId << ":123");
+}
+
+} // namespace
+
+void TransactionCoordinatorTestFixture::setUp() {
+ ShardServerTestFixture::setUp();
+
+ ASSERT_OK(ServerParameterSet::getGlobal()
+ ->getMap()
+ .find("logComponentVerbosity")
+ ->second->setFromString("{verbosity: 3}"));
+
+ for (const auto& shardId : kThreeShardIdList) {
+ auto shardTargeter = RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))->getTargeter());
+ shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
+ }
+}
+
+std::unique_ptr<ShardingCatalogClient> TransactionCoordinatorTestFixture::makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) {
+
+ class StaticCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ StaticCatalogClient(std::vector<ShardId> shardIds)
+ : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {}
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
+ std::vector<ShardType> shardTypes;
+ for (const auto& shardId : _shardIds) {
+ const ConnectionString cs =
+ ConnectionString::forReplicaSet(shardId.toString(), {makeHostAndPort(shardId)});
+ ShardType sType;
+ sType.setName(cs.getSetName());
+ sType.setHost(cs.toString());
+ shardTypes.push_back(std::move(sType));
+ };
+ return repl::OpTimeWith<std::vector<ShardType>>(shardTypes);
+ }
+
+ private:
+ const std::vector<ShardId> _shardIds;
+ };
+
+ return stdx::make_unique<StaticCatalogClient>(kThreeShardIdList);
+}
+
+void TransactionCoordinatorTestFixture::assertCommandSentAndRespondWith(
+ const StringData& commandName,
+ const StatusWith<BSONObj>& response,
+ boost::optional<BSONObj> expectedWriteConcern) {
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName);
+ if (expectedWriteConcern) {
+ ASSERT_BSONOBJ_EQ(
+ *expectedWriteConcern,
+ request.cmdObj.getObjectField(WriteConcernOptions::kWriteConcernField));
+ }
+ return response;
+ });
+}
+
+void TransactionCoordinatorTestFixture::advanceClockAndExecuteScheduledTasks() {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->advanceTime(network()->now() + Seconds{1});
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_test_fixture.h b/src/mongo/db/transaction_coordinator_test_fixture.h
new file mode 100644
index 00000000000..7466ad5acc5
--- /dev/null
+++ b/src/mongo/db/transaction_coordinator_test_fixture.h
@@ -0,0 +1,80 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <set>
+#include <vector>
+
+#include "mongo/base/status.h"
+#include "mongo/db/transaction_coordinator.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/s/shard_server_test_fixture.h"
+
+namespace mongo {
+
+/**
+ * Implements common functionality shared across the various transaction coordinator unit-tests.
+ */
+class TransactionCoordinatorTestFixture : public ShardServerTestFixture {
+protected:
+ using PrepareResponse = txn::PrepareResponse;
+
+ void setUp() override;
+
+ /**
+ * Override the CatalogClient to make CatalogClient::getAllShards automatically return the
+ * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the
+ * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no
+ * DBClientMock analogous to the NetworkInterfaceMock.
+ */
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override;
+
+ void assertCommandSentAndRespondWith(const StringData& commandName,
+ const StatusWith<BSONObj>& response,
+ boost::optional<BSONObj> expectedWriteConcern);
+ /**
+ * These tests use the network task executor mock, which doesn't automatically execute tasks,
+ * which are scheduled with delay. This helper function advances the clock by 1 second (which is
+ * the maximum back-off in the transaction coordinator) and causes any retries to run.
+ */
+ void advanceClockAndExecuteScheduledTasks();
+
+ const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}};
+ const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}};
+ const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}};
+ const std::set<ShardId> kThreeShardIdSet{{"s1"}, {"s2"}, {"s3"}};
+
+ const Status kRetryableError{ErrorCodes::HostUnreachable,
+ "Retryable error for coordinator test"};
+};
+
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index ed9d67ea536..f700f6c211e 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -420,6 +420,16 @@ Date_t NetworkInterfaceMock::runUntil(Date_t until) {
return _now_inlock();
}
+void NetworkInterfaceMock::advanceTime(Date_t newTime) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kNetworkThread);
+ invariant(newTime > _now_inlock());
+ _now = newTime;
+
+ _waitingToRunMask |= kExecutorThread;
+ _runReadyNetworkOperations_inlock(&lk);
+}
+
void NetworkInterfaceMock::runReadyNetworkOperations() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 621a44a02c5..937bed41525 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -253,6 +253,11 @@ public:
Date_t runUntil(Date_t until);
/**
+ * Runs the simulator forward until now() == until.
+ */
+ void advanceTime(Date_t newTime);
+
+ /**
* Processes all ready, scheduled network operations.
*
* Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.