summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-16 21:26:30 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-02-05 07:42:15 -0500
commit9a90dfc2fbe769d3caf02a3550f00bc9fa9df483 (patch)
tree6027d8f5c63e808fce9e23f45201962733454ccc
parentf619c619a10aba632422dde6f30ebbdcbf370003 (diff)
downloadmongo-9a90dfc2fbe769d3caf02a3550f00bc9fa9df483.tar.gz
SERVER-38521 Make the AsyncWorkScheduler be interruptible and support sub-schedulers
-rw-r--r--src/mongo/db/client.cpp4
-rw-r--r--src/mongo/db/client.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/transaction_coordinator.cpp10
-rw-r--r--src/mongo/db/transaction_coordinator.h47
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.cpp12
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.h43
-rw-r--r--src/mongo/db/transaction_coordinator_catalog_test.cpp4
-rw-r--r--src/mongo/db/transaction_coordinator_driver.cpp24
-rw-r--r--src/mongo/db/transaction_coordinator_driver.h2
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util.cpp125
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util.h85
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util_test.cpp189
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp8
-rw-r--r--src/mongo/db/transaction_coordinator_service.h14
-rw-r--r--src/mongo/db/transaction_coordinator_service_test.cpp37
16 files changed, 450 insertions, 164 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp
index e7f3e8d9615..9c4204923ff 100644
--- a/src/mongo/db/client.cpp
+++ b/src/mongo/db/client.cpp
@@ -169,4 +169,8 @@ ThreadClient::~ThreadClient() {
currentClient.reset(nullptr);
}
+Client* ThreadClient::get() const {
+ return &cc();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h
index ed3f31f868e..7e24368412d 100644
--- a/src/mongo/db/client.h
+++ b/src/mongo/db/client.h
@@ -247,6 +247,14 @@ public:
ThreadClient(const ThreadClient&) = delete;
ThreadClient(ThreadClient&&) = delete;
void operator=(const ThreadClient&) = delete;
+
+ Client* get() const;
+ Client* operator->() const {
+ return get();
+ }
+ Client& operator*() const {
+ return *get();
+ }
};
/**
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 4bc3b8c8409..e956b342c9c 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -705,10 +705,12 @@ void ReplicationCoordinatorExternalStateImpl::closeConnections() {
void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() {
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
Balancer::get(_service)->interruptBalancer();
+ TransactionCoordinatorService::get(_service)->onStepDown();
} else if (ShardingState::get(_service)->enabled()) {
ChunkSplitter::get(_service).onStepDown();
CatalogCacheLoader::get(_service).onStepDown();
PeriodicBalancerConfigRefresher::get(_service).onStepDown();
+ TransactionCoordinatorService::get(_service)->onStepDown();
}
if (auto validator = LogicalTimeValidator::get(_service)) {
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp
index ea297b10375..df00c708d4a 100644
--- a/src/mongo/db/transaction_coordinator.cpp
+++ b/src/mongo/db/transaction_coordinator.cpp
@@ -72,12 +72,11 @@ CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(
TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
const LogicalSessionId& lsid,
- const TxnNumber& txnNumber)
+ TxnNumber txnNumber)
: _serviceContext(serviceContext),
_driver(serviceContext),
_lsid(lsid),
- _txnNumber(txnNumber),
- _state(CoordinatorState::kInit) {}
+ _txnNumber(txnNumber) {}
TransactionCoordinator::~TransactionCoordinator() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -175,6 +174,11 @@ Future<void> TransactionCoordinator::onCompletion() {
return std::move(completionPromiseFuture.future);
}
+SharedSemiFuture<txn::CommitDecision> TransactionCoordinator::getDecision() {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ return _finalDecisionPromise.getFuture();
+}
+
void TransactionCoordinator::cancelIfCommitNotYetStarted() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_state == CoordinatorState::kInit) {
diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h
index 72b9810425d..0832c38feaa 100644
--- a/src/mongo/db/transaction_coordinator.h
+++ b/src/mongo/db/transaction_coordinator.h
@@ -46,7 +46,7 @@ class TransactionCoordinator {
public:
TransactionCoordinator(ServiceContext* serviceContext,
const LogicalSessionId& lsid,
- const TxnNumber& txnNumber);
+ TxnNumber txnNumber);
~TransactionCoordinator();
/**
@@ -124,10 +124,7 @@ public:
*
* TODO (SERVER-37364): Remove this when it is no longer needed by the coordinator service.
*/
- SharedSemiFuture<txn::CommitDecision> getDecision() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _finalDecisionPromise.getFuture();
- }
+ SharedSemiFuture<txn::CommitDecision> getDecision();
/**
* If runCommit has not yet been called, this will transition this coordinator object to
@@ -189,40 +186,26 @@ private:
// this coordinator.
TransactionCoordinatorDriver _driver;
- /**
- * The logical session id of the transaction that this coordinator is coordinating.
- */
+ // The lsid + transaction number that this coordinator is coordinating
const LogicalSessionId _lsid;
-
- /**
- * The transaction number of the transaction that this coordinator is coordinating.
- */
const TxnNumber _txnNumber;
- /**
- * Protects _state, _finalDecisionPromise, and _completionPromises.
- */
- stdx::mutex _mutex;
+ // Protects the state below
+ mutable stdx::mutex _mutex;
- /**
- * The current state of the coordinator in the commit process.
- */
- CoordinatorState _state;
+ // Stores the current state of the coordinator in the commit process.
+ CoordinatorState _state{CoordinatorState::kInit};
- /**
- * A promise that will contain the final decision made by the coordinator (whether to commit or
- * abort). This is only known once all responses to prepare have been received from all
- * participants, and the collective decision has been persisted to
- * config.transactionCommitDecisions.
- */
+ // Promise which will contain the final decision made by the coordinator (whether to commit or
+ // abort). This is only known once all responses to prepare have been received from all
+ // participants, and the collective decision has been majority persisted to
+ // config.transactionCommitDecisions.
SharedPromise<txn::CommitDecision> _finalDecisionPromise;
- /**
- * A list of all promises corresponding to futures that were returned to callers of
- * onCompletion.
- *
- * TODO (SERVER-38346): Remove this when SharedSemiFuture supports continuations.
- */
+ // A list of all promises corresponding to futures that were returned to callers of
+ // onCompletion.
+ //
+ // TODO (SERVER-38346): Remove this when SharedSemiFuture supports continuations.
std::vector<Promise<void>> _completionPromises;
};
diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp
index 3497476b6ee..460d450d4b1 100644
--- a/src/mongo/db/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog.cpp
@@ -129,12 +129,12 @@ TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, Logic
}
const auto& coordinatorsForSession = coordinatorsForSessionIter->second;
- const auto& lastCoordinatorOnSession = coordinatorsForSession.rbegin();
- // Should never have empty map for a session. Entries for sessions with no transactions should
- // be removed.
- invariant(lastCoordinatorOnSession != coordinatorsForSession.rend());
+ // We should never have empty map for a session because entries for sessions with no
+ // transactions are removed
+ invariant(!coordinatorsForSession.empty());
+ const auto& lastCoordinatorOnSession = coordinatorsForSession.begin();
return std::make_pair(lastCoordinatorOnSession->first, lastCoordinatorOnSession->second);
}
@@ -214,12 +214,12 @@ void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<s
_noStepUpInProgressCv, lk, [this]() { return !_stepUpInProgress; });
}
-std::string TransactionCoordinatorCatalog::toString() {
+std::string TransactionCoordinatorCatalog::toString() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _toString(lk);
}
-std::string TransactionCoordinatorCatalog::_toString(WithLock wl) {
+std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const {
StringBuilder ss;
ss << "[";
for (auto coordinatorsForSession = _coordinatorsBySession.begin();
diff --git a/src/mongo/db/transaction_coordinator_catalog.h b/src/mongo/db/transaction_coordinator_catalog.h
index 9f3ea9d7582..98acd96d3e9 100644
--- a/src/mongo/db/transaction_coordinator_catalog.h
+++ b/src/mongo/db/transaction_coordinator_catalog.h
@@ -30,9 +30,8 @@
#pragma once
-#include <boost/optional/optional.hpp>
+#include <boost/optional.hpp>
#include <map>
-#include <memory>
#include "mongo/base/disallow_copying.h"
#include "mongo/db/operation_context.h"
@@ -110,13 +109,13 @@ public:
* Returns a string representation of the map from LogicalSessionId to the list of TxnNumbers
* with TransactionCoordinators currently in the catalog.
*/
- std::string toString();
+ std::string toString() const;
private:
- /**
- * Protects the state below.
- */
- stdx::mutex _mutex;
+ // Map of transaction coordinators, ordered in decreasing transaction number with the most
+ // recent transaction at the front
+ using TransactionCoordinatorMap =
+ std::map<TxnNumber, std::shared_ptr<TransactionCoordinator>, std::greater<TxnNumber>>;
/**
* Blocks in an interruptible wait until the catalog is not marked as having a stepup in
@@ -124,22 +123,22 @@ private:
*/
void _waitForStepUpToComplete(stdx::unique_lock<stdx::mutex>& lk, OperationContext* opCtx);
- std::string _toString(WithLock wl);
-
/**
- * Contains TransactionCoordinator objects by session id and transaction number. May contain
- * more than one coordinator per session. All coordinators for a session that do not correspond
- * to the latest transaction should either be in the process of committing or aborting.
+ * Constructs a string representation of all the coordinators registered on the catalog.
*/
- LogicalSessionIdMap<std::map<TxnNumber, std::shared_ptr<TransactionCoordinator>>>
- _coordinatorsBySession;
+ std::string _toString(WithLock wl) const;
- /**
- * Used only for testing. Contains TransactionCoordinator objects which have completed their
- * commit coordination and would normally be expunged from memory.
- */
- LogicalSessionIdMap<std::map<TxnNumber, std::shared_ptr<TransactionCoordinator>>>
- _coordinatorsBySessionDefunct;
+ // Protects the state below.
+ mutable stdx::mutex _mutex;
+
+ // Contains TransactionCoordinator objects by session id and transaction number. May contain
+ // more than one coordinator per session. All coordinators for a session that do not correspond
+ // to the latest transaction should either be in the process of committing or aborting.
+ LogicalSessionIdMap<TransactionCoordinatorMap> _coordinatorsBySession;
+
+ // Used only for testing. Contains TransactionCoordinator objects which have completed their
+ // commit coordination and would normally be expunged from memory.
+ LogicalSessionIdMap<TransactionCoordinatorMap> _coordinatorsBySessionDefunct;
/**
* Whether a thread is actively executing a stepUp task.
@@ -152,9 +151,7 @@ private:
*/
stdx::condition_variable _noStepUpInProgressCv;
- /**
- * Notified when the last coordinator is removed from the catalog.
- */
+ // Notified when the last coordinator is removed from the catalog.
stdx::condition_variable _noActiveCoordinatorsCv;
};
diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp
index 47303b590f6..9bc847790ad 100644
--- a/src/mongo/db/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp
@@ -38,10 +38,8 @@
namespace mongo {
namespace {
-const Timestamp dummyTimestamp = Timestamp::min();
-
class TransactionCoordinatorCatalogTest : public ShardServerTestFixture {
-public:
+protected:
void setUp() override {
ShardServerTestFixture::setUp();
_coordinatorCatalog = std::make_shared<TransactionCoordinatorCatalog>();
diff --git a/src/mongo/db/transaction_coordinator_driver.cpp b/src/mongo/db/transaction_coordinator_driver.cpp
index 937627aff3f..720f18b85bf 100644
--- a/src/mongo/db/transaction_coordinator_driver.cpp
+++ b/src/mongo/db/transaction_coordinator_driver.cpp
@@ -113,7 +113,7 @@ std::string buildParticipantListString(const std::vector<ShardId>& participantLi
} // namespace
TransactionCoordinatorDriver::TransactionCoordinatorDriver(ServiceContext* service)
- : _scheduler(service) {}
+ : _scheduler(std::make_unique<txn::AsyncWorkScheduler>(service)) {}
TransactionCoordinatorDriver::~TransactionCoordinatorDriver() = default;
@@ -210,7 +210,7 @@ void persistParticipantListBlocking(OperationContext* opCtx,
Future<void> TransactionCoordinatorDriver::persistParticipantList(
const LogicalSessionId& lsid, TxnNumber txnNumber, std::vector<ShardId> participantList) {
- return txn::doWhile(_scheduler,
+ return txn::doWhile(*_scheduler,
boost::none /* no need for a backoff */,
[](const Status& s) {
// 'Interrupted' is the error code delivered for killOp sent by a user.
@@ -219,7 +219,7 @@ Future<void> TransactionCoordinatorDriver::persistParticipantList(
return s.code() == ErrorCodes::Interrupted;
},
[this, lsid, txnNumber, participantList] {
- return _scheduler.scheduleWork(
+ return _scheduler->scheduleWork(
[lsid, txnNumber, participantList](OperationContext* opCtx) {
persistParticipantListBlocking(
opCtx, lsid, txnNumber, participantList);
@@ -249,7 +249,7 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
// Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp
// (used for commit), stopping the aggregation and preventing any further retries as soon as an
// abort decision is received. Return a future containing the result.
- return collect(
+ return txn::collect(
std::move(responses),
// Initial value
txn::PrepareVoteConsensus{boost::none, boost::none},
@@ -414,7 +414,7 @@ Future<void> TransactionCoordinatorDriver::persistDecision(
std::vector<ShardId> participantList,
const boost::optional<Timestamp>& commitTimestamp) {
return txn::doWhile(
- _scheduler,
+ *_scheduler,
boost::none /* no need for a backoff */,
[](const Status& s) {
// 'Interrupted' is the error code delivered for killOp sent by a user. Note, we do not
@@ -422,7 +422,7 @@ Future<void> TransactionCoordinatorDriver::persistDecision(
return s.code() == ErrorCodes::Interrupted;
},
[this, lsid, txnNumber, participantList, commitTimestamp] {
- return _scheduler.scheduleWork([lsid, txnNumber, participantList, commitTimestamp](
+ return _scheduler->scheduleWork([lsid, txnNumber, participantList, commitTimestamp](
OperationContext* opCtx) {
persistDecisionBlocking(opCtx, lsid, txnNumber, participantList, commitTimestamp);
});
@@ -533,7 +533,7 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSessionId& lsid,
TxnNumber txnNumber) {
- return txn::doWhile(_scheduler,
+ return txn::doWhile(*_scheduler,
boost::none /* no need for a backoff */,
[](const Status& s) {
// 'Interrupted' is the error code delivered for killOp sent by a
@@ -542,7 +542,7 @@ Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSes
return s.code() == ErrorCodes::Interrupted;
},
[this, lsid, txnNumber] {
- return _scheduler.scheduleWork(
+ return _scheduler->scheduleWork(
[lsid, txnNumber](OperationContext* opCtx) {
deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber);
});
@@ -572,7 +572,7 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl
Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
const ShardId& shardId, const BSONObj& commandObj) {
return txn::doWhile(
- _scheduler,
+ *_scheduler,
kExponentialBackoff,
[](StatusWith<PrepareResponse> swPrepareResponse) {
// 'Interrupted' is the error code delivered for killOp sent by a user. Note, we do not
@@ -581,7 +581,7 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
isRetryableError(swPrepareResponse.getStatus().code());
},
[ this, shardId, commandObj = commandObj.getOwned() ] {
- return _scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
+ return _scheduler->scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
.then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
@@ -652,7 +652,7 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard(
const ShardId& shardId, const BSONObj& commandObj) {
return txn::doWhile(
- _scheduler,
+ *_scheduler,
kExponentialBackoff,
[](const Status& s) {
// 'Interrupted' is the error code delivered for killOp sent by a user. Note, we do not
@@ -660,7 +660,7 @@ Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard(
return s.code() == ErrorCodes::Interrupted || isRetryableError(s.code());
},
[ this, shardId, commandObj = commandObj.getOwned() ] {
- return _scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
+ return _scheduler->scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
.then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
diff --git a/src/mongo/db/transaction_coordinator_driver.h b/src/mongo/db/transaction_coordinator_driver.h
index 436fe633c56..93db15c6447 100644
--- a/src/mongo/db/transaction_coordinator_driver.h
+++ b/src/mongo/db/transaction_coordinator_driver.h
@@ -232,7 +232,7 @@ public:
Future<void> sendDecisionToParticipantShard(const ShardId& shardId, const BSONObj& commandObj);
private:
- txn::AsyncWorkScheduler _scheduler;
+ std::unique_ptr<txn::AsyncWorkScheduler> _scheduler;
// TODO (SERVER-38522): Remove once AsyncWorkScheduler is used for cancellation
AtomicWord<bool> _cancelled{false};
diff --git a/src/mongo/db/transaction_coordinator_futures_util.cpp b/src/mongo/db/transaction_coordinator_futures_util.cpp
index d79d7076cf2..ef2ced6bcae 100644
--- a/src/mongo/db/transaction_coordinator_futures_util.cpp
+++ b/src/mongo/db/transaction_coordinator_futures_util.cpp
@@ -57,7 +57,21 @@ AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext)
: _serviceContext(serviceContext),
_executor(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) {}
-AsyncWorkScheduler::~AsyncWorkScheduler() = default;
+AsyncWorkScheduler::~AsyncWorkScheduler() {
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ invariant(_activeOpContexts.empty());
+ invariant(_activeHandles.empty());
+ invariant(_childSchedulers.empty());
+ }
+
+ if (!_parent)
+ return;
+
+ stdx::lock_guard<stdx::mutex> lg(_parent->_mutex);
+ _parent->_childSchedulers.erase(_itToRemove);
+ _parent = nullptr;
+}
Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand(
const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) {
@@ -77,40 +91,33 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot
// rather than going through the host targeting below. This ensures that the state changes
// for the participant and coordinator occur sequentially on a single branch of replica set
// history. See SERVER-38142 for details.
- return scheduleWork([ shardId,
- commandObj = commandObj.getOwned() ](OperationContext * opCtx) {
- // Note: This internal authorization is tied to the lifetime of 'opCtx', which is
+ return scheduleWork([ this, shardId, commandObj = commandObj.getOwned() ](OperationContext *
+ opCtx) {
+ // NOTE: This internal authorization is tied to the lifetime of client, which will be
// destroyed by 'scheduleWork' immediately after this lambda ends.
- AuthorizationSession::get(Client::getCurrent())->grantInternalAuthorization();
+ AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization();
LOG(3) << "Coordinator going to send command " << commandObj << " to shard " << shardId;
- auto start = Date_t::now();
+ const auto service = opCtx->getServiceContext();
+ auto start = _executor->now();
auto requestOpMsg =
OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, commandObj).serialize();
- const auto replyOpMsg = OpMsg::parseOwned(opCtx->getServiceContext()
- ->getServiceEntryPoint()
- ->handleRequest(opCtx, requestOpMsg)
- .response);
+ const auto replyOpMsg = OpMsg::parseOwned(
+ service->getServiceEntryPoint()->handleRequest(opCtx, requestOpMsg).response);
// Document sequences are not yet being used for responses.
invariant(replyOpMsg.sequences.empty());
// 'ResponseStatus' is the response format of a remote request sent over the network, so
// we simulate that format manually here, since we sent the request over the loopback.
- return ResponseStatus{replyOpMsg.body.getOwned(), Date_t::now() - start};
+ return ResponseStatus{replyOpMsg.body.getOwned(), _executor->now() - start};
});
}
- // Manually simulate a futures interface to the TaskExecutor by creating this promise-future
- // pair and setting the promise from inside the callback passed to the TaskExecutor.
- auto promiseAndFuture = makePromiseFuture<ResponseStatus>();
- auto sharedPromise =
- std::make_shared<Promise<ResponseStatus>>(std::move(promiseAndFuture.promise));
-
- _targetHostAsync(shardId, readPref)
- .then([ this, shardId, sharedPromise, commandObj = commandObj.getOwned(), readPref ](
+ return _targetHostAsync(shardId, readPref)
+ .then([ this, shardId, commandObj = commandObj.getOwned(), readPref ](
HostAndPort shardHostAndPort) mutable {
LOG(3) << "Coordinator sending command " << commandObj << " to shard " << shardId;
@@ -120,32 +127,86 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot
readPref.toContainingBSON(),
nullptr);
- uassertStatusOK(_executor->scheduleRemoteCommand(
- request, [ commandObj = commandObj.getOwned(),
- shardId,
- sharedPromise ](const RemoteCommandCallbackArgs& args) mutable {
+ auto pf = makePromiseFuture<ResponseStatus>();
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ uassertStatusOK(_shutdownStatus);
+
+ auto scheduledCommandHandle =
+ uassertStatusOK(_executor->scheduleRemoteCommand(request, [
+ this,
+ commandObj = std::move(commandObj),
+ shardId = std::move(shardId),
+ promise = std::make_shared<Promise<ResponseStatus>>(std::move(pf.promise))
+ ](const RemoteCommandCallbackArgs& args) mutable noexcept {
LOG(3) << "Coordinator shard got response " << args.response.data << " for "
<< commandObj << " to " << shardId;
auto status = args.response.status;
// Only consider actual failures to send the command as errors.
if (status.isOK()) {
- sharedPromise->emplaceValue(args.response);
+ promise->emplaceValue(std::move(args.response));
} else {
- sharedPromise->setError(status);
+ promise->setError([&] {
+ if (status == ErrorCodes::CallbackCanceled) {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ return _shutdownStatus.isOK() ? status : _shutdownStatus;
+ }
+ return status;
+ }());
}
}));
+
+ auto it =
+ _activeHandles.emplace(_activeHandles.begin(), std::move(scheduledCommandHandle));
+
+ ul.unlock();
+
+ return std::move(pf.future).tapAll(
+ [ this, it = std::move(it) ](StatusWith<ResponseStatus> s) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _activeHandles.erase(it);
+ });
})
- .onError([ shardId, commandObj = commandObj.getOwned(), sharedPromise ](Status s) {
+ .tapError([ shardId, commandObj = commandObj.getOwned() ](Status s) {
LOG(3) << "Coordinator shard failed to target command " << commandObj << " to shard "
<< shardId << causedBy(s);
+ });
+}
- sharedPromise->setError(s);
- })
- .getAsync([](Status) {});
+std::unique_ptr<AsyncWorkScheduler> AsyncWorkScheduler::makeChildScheduler() {
+ auto child = stdx::make_unique<AsyncWorkScheduler>(_serviceContext);
- // Do not wait for the callback to run. The continuation on the future corresponding to
- // 'sharedPromise' will reschedule the remote request if necessary.
- return std::move(promiseAndFuture.future);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ if (!_shutdownStatus.isOK())
+ child->shutdown(_shutdownStatus);
+
+ child->_parent = this;
+ child->_itToRemove = _childSchedulers.emplace(_childSchedulers.begin(), child.get());
+
+ return child;
+}
+
+void AsyncWorkScheduler::shutdown(Status status) {
+ invariant(!status.isOK());
+
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ if (!_shutdownStatus.isOK())
+ return;
+
+ _shutdownStatus = std::move(status);
+
+ for (const auto& it : _activeOpContexts) {
+ stdx::lock_guard<Client> clientLock(*it->getClient());
+ _serviceContext->killOperation(clientLock, it.get(), _shutdownStatus.code());
+ }
+
+ for (const auto& it : _activeHandles) {
+ _executor->cancel(it);
+ }
+
+ for (auto& child : _childSchedulers) {
+ child->shutdown(_shutdownStatus);
+ }
}
Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId,
diff --git a/src/mongo/db/transaction_coordinator_futures_util.h b/src/mongo/db/transaction_coordinator_futures_util.h
index 5d677290da1..a30112373b4 100644
--- a/src/mongo/db/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/transaction_coordinator_futures_util.h
@@ -51,6 +51,10 @@ public:
AsyncWorkScheduler(ServiceContext* serviceContext);
~AsyncWorkScheduler();
+ /**
+ * Schedules the specified callable to execute asynchronously and returns a future which will be
+ * set with its result.
+ */
template <class Callable>
Future<FutureContinuationResult<Callable, OperationContext*>> scheduleWork(
Callable&& task) noexcept {
@@ -70,19 +74,45 @@ public:
auto pf = makePromiseFuture<ReturnType>();
auto taskCompletionPromise = std::make_shared<Promise<ReturnType>>(std::move(pf.promise));
try {
- uassertStatusOK(_executor->scheduleWorkAt(
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ uassertStatusOK(_shutdownStatus);
+
+ auto scheduledWorkHandle = uassertStatusOK(_executor->scheduleWorkAt(
when,
[ this, task = std::forward<Callable>(task), taskCompletionPromise ](
const executor::TaskExecutor::CallbackArgs&) mutable noexcept {
- ThreadClient tc("TransactionCoordinator", _serviceContext);
- auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
- taskCompletionPromise->setWith([&] { return task(uniqueOpCtx.get()); });
+ taskCompletionPromise->setWith([&] {
+ ThreadClient tc("TransactionCoordinator", _serviceContext);
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ uassertStatusOK(_shutdownStatus);
+
+ auto uniqueOpCtxIter = _activeOpContexts.emplace(
+ _activeOpContexts.begin(), tc->makeOperationContext());
+ ul.unlock();
+
+ auto scopedGuard = makeGuard([&] {
+ ul.lock();
+ _activeOpContexts.erase(uniqueOpCtxIter);
+ });
+
+ return task(uniqueOpCtxIter->get());
+ });
}));
+
+ auto it =
+ _activeHandles.emplace(_activeHandles.begin(), std::move(scheduledWorkHandle));
+
+ ul.unlock();
+
+ return std::move(pf.future).tapAll(
+ [ this, it = std::move(it) ](StatusOrStatusWith<ReturnType> s) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _activeHandles.erase(it);
+ });
} catch (const DBException& ex) {
taskCompletionPromise->setError(ex.toStatus());
+ return std::move(pf.future);
}
-
- return std::move(pf.future);
}
/**
@@ -92,7 +122,29 @@ public:
Future<executor::TaskExecutor::ResponseStatus> scheduleRemoteCommand(
const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj);
+ /**
+ * Allows sub-tasks on this scheduler to be grouped together and works-around the fact that
+ * futures are not cancellable.
+ *
+ * Shutting down the returned child scheduler has no effect on the parent. Shutting down the
+ * parent scheduler also shuts down all child schedulers and prevents new ones from starting.
+ */
+ std::unique_ptr<AsyncWorkScheduler> makeChildScheduler();
+
+ /**
+ * Non-blocking method, which interrupts all currently active scheduled commands or tasks and
+ * prevents any new ones from starting.
+ * After this method is called, all returned futures, which haven't yet been signalled will be
+ * set to the specified status. Attempting to schedule any new operations will return ready
+ * futures set to the specified status.
+ *
+ * Must not be called with Status::OK.
+ */
+ void shutdown(Status status);
+
private:
+ using ChildIteratorsList = std::list<AsyncWorkScheduler*>;
+
/**
* Finds the host and port for a shard.
*/
@@ -104,6 +156,27 @@ private:
// Cached reference to the executor to use
executor::TaskExecutor* const _executor;
+
+ // If this work scheduler was constructed through 'makeChildScheduler', points to the parent
+ // scheduler and contains the iterator from the parent, which needs to be removed on destruction
+ AsyncWorkScheduler* _parent{nullptr};
+ ChildIteratorsList::iterator _itToRemove;
+
+ // Mutex to protect the shared state below
+ stdx::mutex _mutex;
+
+ // If shutdown() is called, this contains the first status that was passed to it and is an
+ // indication that no more operations can be scheduled
+ Status _shutdownStatus{Status::OK()};
+
+ // Any active scheduled work will have its operation context stored here
+ std::list<ServiceContext::UniqueOperationContext> _activeOpContexts;
+
+ // Any active scheduled work or network operation will have its TaskExecutor handle stored here
+ std::list<executor::TaskExecutor::CallbackHandle> _activeHandles;
+
+ // Any outstanding child schedulers created though 'makeChildScheduler'
+ ChildIteratorsList _childSchedulers;
};
enum class ShouldStopIteration { kYes, kNo };
diff --git a/src/mongo/db/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/transaction_coordinator_futures_util_test.cpp
index ecb4ed5ba66..3ce434d25ed 100644
--- a/src/mongo/db/transaction_coordinator_futures_util_test.cpp
+++ b/src/mongo/db/transaction_coordinator_futures_util_test.cpp
@@ -41,6 +41,8 @@ namespace mongo {
namespace txn {
namespace {
+using Barrier = unittest::Barrier;
+
TEST(TransactionCoordinatorFuturesUtilTest, CollectReturnsInitValueWhenInputIsEmptyVector) {
std::vector<Future<int>> futures;
auto resultFuture = txn::collect(std::move(futures), 0, [](int& result, const int& next) {
@@ -167,7 +169,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkSucceeds) {
ASSERT(!future.isReady());
pf.promise.emplaceValue(5);
- ASSERT_EQ(5, future.get(operationContext()));
+ ASSERT_EQ(5, future.get());
}
TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkThrowsException) {
@@ -186,8 +188,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkThrowsException) {
ASSERT(!future.isReady());
pf.promise.emplaceValue(5);
- ASSERT_THROWS_CODE(
- future.get(operationContext()), AssertionException, ErrorCodes::InternalError);
+ ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::InternalError);
}
TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkInSucceeds) {
@@ -213,7 +214,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkInSucceeds) {
ASSERT(future.isReady());
}
- ASSERT_EQ(5, future.get(operationContext()));
+ ASSERT_EQ(5, future.get());
}
TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsOK) {
@@ -229,7 +230,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsOK) {
return objResponse;
});
- const auto& response = future.get(operationContext());
+ const auto& response = future.get();
ASSERT(response.isOK());
ASSERT_BSONOBJ_EQ(objResponse, response.data);
}
@@ -247,7 +248,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsNotOK) {
return objResponse;
});
- const auto& response = future.get(operationContext());
+ const auto& response = future.get();
ASSERT(response.isOK());
ASSERT_BSONOBJ_EQ(objResponse, response.data);
}
@@ -270,13 +271,159 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandsOneOKAndOneError) {
return BSON("ok" << 0 << "responseData" << 3);
});
- const auto& response2 = future2.get(operationContext());
+ const auto& response2 = future2.get();
ASSERT(response2.isOK());
- const auto& response1 = future1.get(operationContext());
+ const auto& response1 = future1.get();
ASSERT(response1.isOK());
}
+TEST_F(AsyncWorkSchedulerTest, ShutdownInterruptsRunningBlockedTasks) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ Barrier barrier(2);
+
+ auto future = async.scheduleWork([&barrier](OperationContext* opCtx) {
+ barrier.countDownAndWait();
+ opCtx->sleepFor(Hours(6));
+ });
+
+ barrier.countDownAndWait();
+ ASSERT(!future.isReady());
+
+ async.shutdown({ErrorCodes::InternalError, "Test internal error"});
+
+ ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::InternalError);
+}
+
+TEST_F(AsyncWorkSchedulerTest, ShutdownInterruptsNotYetScheduledTasks) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ AtomicWord<int> numInvocations{0};
+
+ auto future1 =
+ async.scheduleWorkIn(Milliseconds(1), [&numInvocations](OperationContext* opCtx) {
+ numInvocations.addAndFetch(1);
+ });
+
+ auto future2 =
+ async.scheduleWorkIn(Milliseconds(1), [&numInvocations](OperationContext* opCtx) {
+ numInvocations.addAndFetch(1);
+ });
+
+ ASSERT(!future1.isReady());
+ ASSERT(!future2.isReady());
+ ASSERT_EQ(0, numInvocations.load());
+
+ async.shutdown({ErrorCodes::InternalError, "Test internal error"});
+ ASSERT_EQ(0, numInvocations.load());
+
+ ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InternalError);
+ ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InternalError);
+
+ ASSERT_EQ(0, numInvocations.load());
+}
+
+TEST_F(AsyncWorkSchedulerTest, ShutdownInterruptsRemoteCommandsWhichAreBlockedWaitingForResponse) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ auto future1 = async.scheduleRemoteCommand(
+ kShardIds[1], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 1));
+
+ auto future2 = async.scheduleRemoteCommand(
+ kShardIds[2], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 1));
+
+ // Wait till at least one of the two commands above gets scheduled
+ network()->waitForWork();
+
+ ASSERT(!future1.isReady());
+ ASSERT(!future2.isReady());
+
+ async.shutdown({ErrorCodes::InternalError, "Test internal error"});
+
+ ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InternalError);
+ ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InternalError);
+}
+
+TEST_F(AsyncWorkSchedulerTest, ShutdownChildSchedulerOnlyInterruptsChildTasks) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ auto futureFromParent = async.scheduleWorkIn(
+ Milliseconds(1), [](OperationContext* opCtx) { return std::string("Parent"); });
+
+ auto childAsync1 = async.makeChildScheduler();
+ auto childFuture1 = childAsync1->scheduleWorkIn(
+ Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child1"); });
+
+ auto childAsync2 = async.makeChildScheduler();
+ auto childFuture2 = childAsync2->scheduleWorkIn(
+ Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child2"); });
+
+ childAsync1->shutdown({ErrorCodes::InternalError, "Test error"});
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->advanceTime(network()->now() + Milliseconds(1));
+ }
+
+ ASSERT_EQ("Parent", futureFromParent.get());
+ ASSERT_THROWS_CODE(childFuture1.get(), AssertionException, ErrorCodes::InternalError);
+ ASSERT_EQ("Child2", childFuture2.get());
+}
+
+TEST_F(AsyncWorkSchedulerTest, ShutdownParentSchedulerInterruptsAllChildTasks) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ auto futureFromParent = async.scheduleWorkIn(
+ Milliseconds(1), [](OperationContext* opCtx) { return std::string("Parent"); });
+
+ auto childAsync1 = async.makeChildScheduler();
+ auto childFuture1 = childAsync1->scheduleWorkIn(
+ Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child1"); });
+
+ auto childAsync2 = async.makeChildScheduler();
+ auto childFuture2 = childAsync2->scheduleWorkIn(
+ Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child2"); });
+
+ async.shutdown({ErrorCodes::InternalError, "Test error"});
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->advanceTime(network()->now() + Milliseconds(1));
+ }
+
+ ASSERT_THROWS_CODE(futureFromParent.get(), AssertionException, ErrorCodes::InternalError);
+ ASSERT_THROWS_CODE(childFuture1.get(), AssertionException, ErrorCodes::InternalError);
+ ASSERT_THROWS_CODE(childFuture2.get(), AssertionException, ErrorCodes::InternalError);
+}
+
+TEST_F(AsyncWorkSchedulerTest, MakeChildSchedulerAfterShutdownParentScheduler) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ // Shut down the parent scheduler immediately
+ async.shutdown({ErrorCodes::InternalError, "Test error"});
+
+ auto futureFromParent = async.scheduleWorkIn(
+ Milliseconds(1), [](OperationContext* opCtx) { return std::string("Parent"); });
+
+ auto childAsync1 = async.makeChildScheduler();
+ auto childFuture1 = childAsync1->scheduleWorkIn(
+ Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child1"); });
+
+ auto childAsync2 = async.makeChildScheduler();
+ auto childFuture2 = childAsync2->scheduleWorkIn(
+ Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child2"); });
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->advanceTime(network()->now() + Milliseconds(1));
+ }
+
+ ASSERT_THROWS_CODE(futureFromParent.get(), AssertionException, ErrorCodes::InternalError);
+ ASSERT_THROWS_CODE(childFuture1.get(), AssertionException, ErrorCodes::InternalError);
+ ASSERT_THROWS_CODE(childFuture2.get(), AssertionException, ErrorCodes::InternalError);
+}
+
using DoWhileTest = AsyncWorkSchedulerTest;
@@ -294,7 +441,7 @@ TEST_F(DoWhileTest, LoopBodyExecutesAtLeastOnceWithBackoff) {
ASSERT(future.isReady());
ASSERT_EQ(1, numLoops);
- ASSERT_EQ(1, future.get(operationContext()));
+ ASSERT_EQ(1, future.get());
}
TEST_F(DoWhileTest, LoopBodyExecutesManyIterationsWithoutBackoff) {
@@ -309,7 +456,7 @@ TEST_F(DoWhileTest, LoopBodyExecutesManyIterationsWithoutBackoff) {
},
[&remainingLoops] { return Future<int>::makeReady(--remainingLoops); });
- ASSERT_EQ(0, future.get(operationContext()));
+ ASSERT_EQ(0, future.get());
ASSERT_EQ(0, remainingLoops);
}
@@ -350,7 +497,27 @@ TEST_F(DoWhileTest, LoopObeysBackoff) {
ASSERT_EQ(3, numLoops);
}
- ASSERT_EQ(3, future.get(operationContext()));
+ ASSERT_EQ(3, future.get());
+}
+
+TEST_F(DoWhileTest, LoopObeysShutdown) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ int numLoops = 0;
+ auto future =
+ doWhile(async,
+ boost::none,
+ [](const StatusWith<int>& status) { return status != ErrorCodes::InternalError; },
+ [&numLoops] { return Future<int>::makeReady(++numLoops); });
+
+ // Wait for at least one loop
+ while (numLoops == 0)
+ sleepFor(Milliseconds(25));
+
+ ASSERT(!future.isReady());
+ async.shutdown({ErrorCodes::InternalError, "Test internal error"});
+
+ ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::InternalError);
}
} // namespace
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index 5a1becbe5a3..27e1fba797a 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -34,16 +34,12 @@
#include "mongo/db/transaction_coordinator_service.h"
-#include "mongo/db/operation_context.h"
#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/service_context.h"
#include "mongo/db/transaction_coordinator_document_gen.h"
#include "mongo/db/write_concern.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/grid.h"
-#include "mongo/s/shard_id.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -57,6 +53,8 @@ const auto transactionCoordinatorServiceDecoration =
TransactionCoordinatorService::TransactionCoordinatorService()
: _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {}
+TransactionCoordinatorService::~TransactionCoordinatorService() = default;
+
TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
}
@@ -209,6 +207,6 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) {
invariant(status);
}
-void TransactionCoordinatorService::onStepDown(OperationContext* opCtx) {}
+void TransactionCoordinatorService::onStepDown() {}
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h
index 89971179010..115c7604bcc 100644
--- a/src/mongo/db/transaction_coordinator_service.h
+++ b/src/mongo/db/transaction_coordinator_service.h
@@ -30,25 +30,17 @@
#pragma once
-#include <memory>
-
#include "mongo/base/disallow_copying.h"
-#include "mongo/db/logical_session_id.h"
#include "mongo/db/transaction_coordinator_catalog.h"
-#include "mongo/util/future.h"
namespace mongo {
-class ShardId;
-class OperationContext;
-class ServiceContext;
-
-class TransactionCoordinatorService final {
+class TransactionCoordinatorService {
MONGO_DISALLOW_COPYING(TransactionCoordinatorService);
public:
TransactionCoordinatorService();
- ~TransactionCoordinatorService() = default;
+ ~TransactionCoordinatorService();
/**
* Retrieves the TransactionCoordinatorService associated with the service or operation context.
@@ -100,7 +92,7 @@ public:
* async task to continue coordinating its commit.
*/
void onStepUp(OperationContext* opCtx);
- void onStepDown(OperationContext* opCtx);
+ void onStepDown();
private:
std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog;
diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp
index 28c0bcb9780..7b8020f8622 100644
--- a/src/mongo/db/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/transaction_coordinator_service_test.cpp
@@ -42,7 +42,6 @@
namespace mongo {
namespace {
-const Timestamp kDummyTimestamp = Timestamp::min();
const Date_t kCommitDeadline = Date_t::max();
const BSONObj kDummyWriteConcernError = BSON("code" << ErrorCodes::WriteConcernFailed << "errmsg"
@@ -198,24 +197,6 @@ public:
TxnNumber _txnNumber{1};
};
-/**
- * Fixture that during setUp automatically creates a coordinator for a default lsid/txnNumber pair.
- */
-class TransactionCoordinatorServiceTestSingleTxn : public TransactionCoordinatorServiceTest {
-public:
- void setUp() final {
- TransactionCoordinatorServiceTest::setUp();
- TransactionCoordinatorService::get(operationContext())
- ->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
- }
-
- TransactionCoordinatorService* coordinatorService() {
- return TransactionCoordinatorService::get(operationContext());
- }
-};
-
-} // namespace
-
TEST_F(TransactionCoordinatorServiceTest, CreateCoordinatorOnNewSessionSucceeds) {
auto coordinatorService = TransactionCoordinatorService::get(operationContext());
coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
@@ -553,6 +534,23 @@ TEST_F(TransactionCoordinatorServiceTest,
operationContext(), _lsid, _txnNumber, kTwoShardIdSet));
}
+
+/**
+ * Fixture that during setUp automatically creates a coordinator for a default lsid/txnNumber pair.
+ */
+class TransactionCoordinatorServiceTestSingleTxn : public TransactionCoordinatorServiceTest {
+public:
+ void setUp() final {
+ TransactionCoordinatorServiceTest::setUp();
+ TransactionCoordinatorService::get(operationContext())
+ ->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+ }
+
+ TransactionCoordinatorService* coordinatorService() {
+ return TransactionCoordinatorService::get(operationContext());
+ }
+};
+
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
CoordinateCommitReturnsCorrectCommitDecisionOnAbort) {
@@ -624,4 +622,5 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
static_cast<int>(commitDecisionFuture2.get()));
}
+} // namespace
} // namespace mongo