diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-01-16 21:26:30 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-02-05 07:42:15 -0500 |
commit | 9a90dfc2fbe769d3caf02a3550f00bc9fa9df483 (patch) | |
tree | 6027d8f5c63e808fce9e23f45201962733454ccc | |
parent | f619c619a10aba632422dde6f30ebbdcbf370003 (diff) | |
download | mongo-9a90dfc2fbe769d3caf02a3550f00bc9fa9df483.tar.gz |
SERVER-38521 Make the AsyncWorkScheduler be interruptible and support sub-schedulers
-rw-r--r-- | src/mongo/db/client.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/client.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator.h | 47 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog.h | 43 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_driver.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_driver.h | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_futures_util.cpp | 125 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_futures_util.h | 85 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_futures_util_test.cpp | 189 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.h | 14 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service_test.cpp | 37 |
16 files changed, 450 insertions, 164 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index e7f3e8d9615..9c4204923ff 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -169,4 +169,8 @@ ThreadClient::~ThreadClient() { currentClient.reset(nullptr); } +Client* ThreadClient::get() const { + return &cc(); +} + } // namespace mongo diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index ed3f31f868e..7e24368412d 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -247,6 +247,14 @@ public: ThreadClient(const ThreadClient&) = delete; ThreadClient(ThreadClient&&) = delete; void operator=(const ThreadClient&) = delete; + + Client* get() const; + Client* operator->() const { + return get(); + } + Client& operator*() const { + return *get(); + } }; /** diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 4bc3b8c8409..e956b342c9c 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -705,10 +705,12 @@ void ReplicationCoordinatorExternalStateImpl::closeConnections() { void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { Balancer::get(_service)->interruptBalancer(); + TransactionCoordinatorService::get(_service)->onStepDown(); } else if (ShardingState::get(_service)->enabled()) { ChunkSplitter::get(_service).onStepDown(); CatalogCacheLoader::get(_service).onStepDown(); PeriodicBalancerConfigRefresher::get(_service).onStepDown(); + TransactionCoordinatorService::get(_service)->onStepDown(); } if (auto validator = LogicalTimeValidator::get(_service)) { diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp index ea297b10375..df00c708d4a 100644 --- a/src/mongo/db/transaction_coordinator.cpp +++ b/src/mongo/db/transaction_coordinator.cpp @@ -72,12 +72,11 @@ CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus( TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, const LogicalSessionId& lsid, - const TxnNumber& txnNumber) + TxnNumber txnNumber) : _serviceContext(serviceContext), _driver(serviceContext), _lsid(lsid), - _txnNumber(txnNumber), - _state(CoordinatorState::kInit) {} + _txnNumber(txnNumber) {} TransactionCoordinator::~TransactionCoordinator() { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -175,6 +174,11 @@ Future<void> TransactionCoordinator::onCompletion() { return std::move(completionPromiseFuture.future); } +SharedSemiFuture<txn::CommitDecision> TransactionCoordinator::getDecision() { + stdx::lock_guard<stdx::mutex> lg(_mutex); + return _finalDecisionPromise.getFuture(); +} + void TransactionCoordinator::cancelIfCommitNotYetStarted() { stdx::unique_lock<stdx::mutex> lk(_mutex); if (_state == CoordinatorState::kInit) { diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h index 72b9810425d..0832c38feaa 100644 --- a/src/mongo/db/transaction_coordinator.h +++ b/src/mongo/db/transaction_coordinator.h @@ -46,7 +46,7 @@ class TransactionCoordinator { public: TransactionCoordinator(ServiceContext* serviceContext, const LogicalSessionId& lsid, - const TxnNumber& txnNumber); + TxnNumber txnNumber); ~TransactionCoordinator(); /** @@ -124,10 +124,7 @@ public: * * TODO (SERVER-37364): Remove this when it is no longer needed by the coordinator service. */ - SharedSemiFuture<txn::CommitDecision> getDecision() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _finalDecisionPromise.getFuture(); - } + SharedSemiFuture<txn::CommitDecision> getDecision(); /** * If runCommit has not yet been called, this will transition this coordinator object to @@ -189,40 +186,26 @@ private: // this coordinator. TransactionCoordinatorDriver _driver; - /** - * The logical session id of the transaction that this coordinator is coordinating. - */ + // The lsid + transaction number that this coordinator is coordinating const LogicalSessionId _lsid; - - /** - * The transaction number of the transaction that this coordinator is coordinating. - */ const TxnNumber _txnNumber; - /** - * Protects _state, _finalDecisionPromise, and _completionPromises. - */ - stdx::mutex _mutex; + // Protects the state below + mutable stdx::mutex _mutex; - /** - * The current state of the coordinator in the commit process. - */ - CoordinatorState _state; + // Stores the current state of the coordinator in the commit process. + CoordinatorState _state{CoordinatorState::kInit}; - /** - * A promise that will contain the final decision made by the coordinator (whether to commit or - * abort). This is only known once all responses to prepare have been received from all - * participants, and the collective decision has been persisted to - * config.transactionCommitDecisions. - */ + // Promise which will contain the final decision made by the coordinator (whether to commit or + // abort). This is only known once all responses to prepare have been received from all + // participants, and the collective decision has been majority persisted to + // config.transactionCommitDecisions. SharedPromise<txn::CommitDecision> _finalDecisionPromise; - /** - * A list of all promises corresponding to futures that were returned to callers of - * onCompletion. - * - * TODO (SERVER-38346): Remove this when SharedSemiFuture supports continuations. - */ + // A list of all promises corresponding to futures that were returned to callers of + // onCompletion. + // + // TODO (SERVER-38346): Remove this when SharedSemiFuture supports continuations. std::vector<Promise<void>> _completionPromises; }; diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp index 3497476b6ee..460d450d4b1 100644 --- a/src/mongo/db/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/transaction_coordinator_catalog.cpp @@ -129,12 +129,12 @@ TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, Logic } const auto& coordinatorsForSession = coordinatorsForSessionIter->second; - const auto& lastCoordinatorOnSession = coordinatorsForSession.rbegin(); - // Should never have empty map for a session. Entries for sessions with no transactions should - // be removed. - invariant(lastCoordinatorOnSession != coordinatorsForSession.rend()); + // We should never have empty map for a session because entries for sessions with no + // transactions are removed + invariant(!coordinatorsForSession.empty()); + const auto& lastCoordinatorOnSession = coordinatorsForSession.begin(); return std::make_pair(lastCoordinatorOnSession->first, lastCoordinatorOnSession->second); } @@ -214,12 +214,12 @@ void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<s _noStepUpInProgressCv, lk, [this]() { return !_stepUpInProgress; }); } -std::string TransactionCoordinatorCatalog::toString() { +std::string TransactionCoordinatorCatalog::toString() const { stdx::lock_guard<stdx::mutex> lk(_mutex); return _toString(lk); } -std::string TransactionCoordinatorCatalog::_toString(WithLock wl) { +std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const { StringBuilder ss; ss << "["; for (auto coordinatorsForSession = _coordinatorsBySession.begin(); diff --git a/src/mongo/db/transaction_coordinator_catalog.h b/src/mongo/db/transaction_coordinator_catalog.h index 9f3ea9d7582..98acd96d3e9 100644 --- a/src/mongo/db/transaction_coordinator_catalog.h +++ b/src/mongo/db/transaction_coordinator_catalog.h @@ -30,9 +30,8 @@ #pragma once -#include <boost/optional/optional.hpp> +#include <boost/optional.hpp> #include <map> -#include <memory> #include "mongo/base/disallow_copying.h" #include "mongo/db/operation_context.h" @@ -110,13 +109,13 @@ public: * Returns a string representation of the map from LogicalSessionId to the list of TxnNumbers * with TransactionCoordinators currently in the catalog. */ - std::string toString(); + std::string toString() const; private: - /** - * Protects the state below. - */ - stdx::mutex _mutex; + // Map of transaction coordinators, ordered in decreasing transaction number with the most + // recent transaction at the front + using TransactionCoordinatorMap = + std::map<TxnNumber, std::shared_ptr<TransactionCoordinator>, std::greater<TxnNumber>>; /** * Blocks in an interruptible wait until the catalog is not marked as having a stepup in @@ -124,22 +123,22 @@ private: */ void _waitForStepUpToComplete(stdx::unique_lock<stdx::mutex>& lk, OperationContext* opCtx); - std::string _toString(WithLock wl); - /** - * Contains TransactionCoordinator objects by session id and transaction number. May contain - * more than one coordinator per session. All coordinators for a session that do not correspond - * to the latest transaction should either be in the process of committing or aborting. + * Constructs a string representation of all the coordinators registered on the catalog. */ - LogicalSessionIdMap<std::map<TxnNumber, std::shared_ptr<TransactionCoordinator>>> - _coordinatorsBySession; + std::string _toString(WithLock wl) const; - /** - * Used only for testing. Contains TransactionCoordinator objects which have completed their - * commit coordination and would normally be expunged from memory. - */ - LogicalSessionIdMap<std::map<TxnNumber, std::shared_ptr<TransactionCoordinator>>> - _coordinatorsBySessionDefunct; + // Protects the state below. + mutable stdx::mutex _mutex; + + // Contains TransactionCoordinator objects by session id and transaction number. May contain + // more than one coordinator per session. All coordinators for a session that do not correspond + // to the latest transaction should either be in the process of committing or aborting. + LogicalSessionIdMap<TransactionCoordinatorMap> _coordinatorsBySession; + + // Used only for testing. Contains TransactionCoordinator objects which have completed their + // commit coordination and would normally be expunged from memory. + LogicalSessionIdMap<TransactionCoordinatorMap> _coordinatorsBySessionDefunct; /** * Whether a thread is actively executing a stepUp task. @@ -152,9 +151,7 @@ private: */ stdx::condition_variable _noStepUpInProgressCv; - /** - * Notified when the last coordinator is removed from the catalog. - */ + // Notified when the last coordinator is removed from the catalog. stdx::condition_variable _noActiveCoordinatorsCv; }; diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp index 47303b590f6..9bc847790ad 100644 --- a/src/mongo/db/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp @@ -38,10 +38,8 @@ namespace mongo { namespace { -const Timestamp dummyTimestamp = Timestamp::min(); - class TransactionCoordinatorCatalogTest : public ShardServerTestFixture { -public: +protected: void setUp() override { ShardServerTestFixture::setUp(); _coordinatorCatalog = std::make_shared<TransactionCoordinatorCatalog>(); diff --git a/src/mongo/db/transaction_coordinator_driver.cpp b/src/mongo/db/transaction_coordinator_driver.cpp index 937627aff3f..720f18b85bf 100644 --- a/src/mongo/db/transaction_coordinator_driver.cpp +++ b/src/mongo/db/transaction_coordinator_driver.cpp @@ -113,7 +113,7 @@ std::string buildParticipantListString(const std::vector<ShardId>& participantLi } // namespace TransactionCoordinatorDriver::TransactionCoordinatorDriver(ServiceContext* service) - : _scheduler(service) {} + : _scheduler(std::make_unique<txn::AsyncWorkScheduler>(service)) {} TransactionCoordinatorDriver::~TransactionCoordinatorDriver() = default; @@ -210,7 +210,7 @@ void persistParticipantListBlocking(OperationContext* opCtx, Future<void> TransactionCoordinatorDriver::persistParticipantList( const LogicalSessionId& lsid, TxnNumber txnNumber, std::vector<ShardId> participantList) { - return txn::doWhile(_scheduler, + return txn::doWhile(*_scheduler, boost::none /* no need for a backoff */, [](const Status& s) { // 'Interrupted' is the error code delivered for killOp sent by a user. @@ -219,7 +219,7 @@ Future<void> TransactionCoordinatorDriver::persistParticipantList( return s.code() == ErrorCodes::Interrupted; }, [this, lsid, txnNumber, participantList] { - return _scheduler.scheduleWork( + return _scheduler->scheduleWork( [lsid, txnNumber, participantList](OperationContext* opCtx) { persistParticipantListBlocking( opCtx, lsid, txnNumber, participantList); @@ -249,7 +249,7 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( // Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp // (used for commit), stopping the aggregation and preventing any further retries as soon as an // abort decision is received. Return a future containing the result. - return collect( + return txn::collect( std::move(responses), // Initial value txn::PrepareVoteConsensus{boost::none, boost::none}, @@ -414,7 +414,7 @@ Future<void> TransactionCoordinatorDriver::persistDecision( std::vector<ShardId> participantList, const boost::optional<Timestamp>& commitTimestamp) { return txn::doWhile( - _scheduler, + *_scheduler, boost::none /* no need for a backoff */, [](const Status& s) { // 'Interrupted' is the error code delivered for killOp sent by a user. Note, we do not @@ -422,7 +422,7 @@ Future<void> TransactionCoordinatorDriver::persistDecision( return s.code() == ErrorCodes::Interrupted; }, [this, lsid, txnNumber, participantList, commitTimestamp] { - return _scheduler.scheduleWork([lsid, txnNumber, participantList, commitTimestamp]( + return _scheduler->scheduleWork([lsid, txnNumber, participantList, commitTimestamp]( OperationContext* opCtx) { persistDecisionBlocking(opCtx, lsid, txnNumber, participantList, commitTimestamp); }); @@ -533,7 +533,7 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSessionId& lsid, TxnNumber txnNumber) { - return txn::doWhile(_scheduler, + return txn::doWhile(*_scheduler, boost::none /* no need for a backoff */, [](const Status& s) { // 'Interrupted' is the error code delivered for killOp sent by a @@ -542,7 +542,7 @@ Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSes return s.code() == ErrorCodes::Interrupted; }, [this, lsid, txnNumber] { - return _scheduler.scheduleWork( + return _scheduler->scheduleWork( [lsid, txnNumber](OperationContext* opCtx) { deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber); }); @@ -572,7 +572,7 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard( const ShardId& shardId, const BSONObj& commandObj) { return txn::doWhile( - _scheduler, + *_scheduler, kExponentialBackoff, [](StatusWith<PrepareResponse> swPrepareResponse) { // 'Interrupted' is the error code delivered for killOp sent by a user. Note, we do not @@ -581,7 +581,7 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard( isRetryableError(swPrepareResponse.getStatus().code()); }, [ this, shardId, commandObj = commandObj.getOwned() ] { - return _scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) + return _scheduler->scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); @@ -652,7 +652,7 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard( Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard( const ShardId& shardId, const BSONObj& commandObj) { return txn::doWhile( - _scheduler, + *_scheduler, kExponentialBackoff, [](const Status& s) { // 'Interrupted' is the error code delivered for killOp sent by a user. Note, we do not @@ -660,7 +660,7 @@ Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard( return s.code() == ErrorCodes::Interrupted || isRetryableError(s.code()); }, [ this, shardId, commandObj = commandObj.getOwned() ] { - return _scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) + return _scheduler->scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); diff --git a/src/mongo/db/transaction_coordinator_driver.h b/src/mongo/db/transaction_coordinator_driver.h index 436fe633c56..93db15c6447 100644 --- a/src/mongo/db/transaction_coordinator_driver.h +++ b/src/mongo/db/transaction_coordinator_driver.h @@ -232,7 +232,7 @@ public: Future<void> sendDecisionToParticipantShard(const ShardId& shardId, const BSONObj& commandObj); private: - txn::AsyncWorkScheduler _scheduler; + std::unique_ptr<txn::AsyncWorkScheduler> _scheduler; // TODO (SERVER-38522): Remove once AsyncWorkScheduler is used for cancellation AtomicWord<bool> _cancelled{false}; diff --git a/src/mongo/db/transaction_coordinator_futures_util.cpp b/src/mongo/db/transaction_coordinator_futures_util.cpp index d79d7076cf2..ef2ced6bcae 100644 --- a/src/mongo/db/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/transaction_coordinator_futures_util.cpp @@ -57,7 +57,21 @@ AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext) : _serviceContext(serviceContext), _executor(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) {} -AsyncWorkScheduler::~AsyncWorkScheduler() = default; +AsyncWorkScheduler::~AsyncWorkScheduler() { + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + invariant(_activeOpContexts.empty()); + invariant(_activeHandles.empty()); + invariant(_childSchedulers.empty()); + } + + if (!_parent) + return; + + stdx::lock_guard<stdx::mutex> lg(_parent->_mutex); + _parent->_childSchedulers.erase(_itToRemove); + _parent = nullptr; +} Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand( const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) { @@ -77,40 +91,33 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot // rather than going through the host targeting below. This ensures that the state changes // for the participant and coordinator occur sequentially on a single branch of replica set // history. See SERVER-38142 for details. - return scheduleWork([ shardId, - commandObj = commandObj.getOwned() ](OperationContext * opCtx) { - // Note: This internal authorization is tied to the lifetime of 'opCtx', which is + return scheduleWork([ this, shardId, commandObj = commandObj.getOwned() ](OperationContext * + opCtx) { + // NOTE: This internal authorization is tied to the lifetime of client, which will be // destroyed by 'scheduleWork' immediately after this lambda ends. - AuthorizationSession::get(Client::getCurrent())->grantInternalAuthorization(); + AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(); LOG(3) << "Coordinator going to send command " << commandObj << " to shard " << shardId; - auto start = Date_t::now(); + const auto service = opCtx->getServiceContext(); + auto start = _executor->now(); auto requestOpMsg = OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, commandObj).serialize(); - const auto replyOpMsg = OpMsg::parseOwned(opCtx->getServiceContext() - ->getServiceEntryPoint() - ->handleRequest(opCtx, requestOpMsg) - .response); + const auto replyOpMsg = OpMsg::parseOwned( + service->getServiceEntryPoint()->handleRequest(opCtx, requestOpMsg).response); // Document sequences are not yet being used for responses. invariant(replyOpMsg.sequences.empty()); // 'ResponseStatus' is the response format of a remote request sent over the network, so // we simulate that format manually here, since we sent the request over the loopback. - return ResponseStatus{replyOpMsg.body.getOwned(), Date_t::now() - start}; + return ResponseStatus{replyOpMsg.body.getOwned(), _executor->now() - start}; }); } - // Manually simulate a futures interface to the TaskExecutor by creating this promise-future - // pair and setting the promise from inside the callback passed to the TaskExecutor. - auto promiseAndFuture = makePromiseFuture<ResponseStatus>(); - auto sharedPromise = - std::make_shared<Promise<ResponseStatus>>(std::move(promiseAndFuture.promise)); - - _targetHostAsync(shardId, readPref) - .then([ this, shardId, sharedPromise, commandObj = commandObj.getOwned(), readPref ]( + return _targetHostAsync(shardId, readPref) + .then([ this, shardId, commandObj = commandObj.getOwned(), readPref ]( HostAndPort shardHostAndPort) mutable { LOG(3) << "Coordinator sending command " << commandObj << " to shard " << shardId; @@ -120,32 +127,86 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot readPref.toContainingBSON(), nullptr); - uassertStatusOK(_executor->scheduleRemoteCommand( - request, [ commandObj = commandObj.getOwned(), - shardId, - sharedPromise ](const RemoteCommandCallbackArgs& args) mutable { + auto pf = makePromiseFuture<ResponseStatus>(); + + stdx::unique_lock<stdx::mutex> ul(_mutex); + uassertStatusOK(_shutdownStatus); + + auto scheduledCommandHandle = + uassertStatusOK(_executor->scheduleRemoteCommand(request, [ + this, + commandObj = std::move(commandObj), + shardId = std::move(shardId), + promise = std::make_shared<Promise<ResponseStatus>>(std::move(pf.promise)) + ](const RemoteCommandCallbackArgs& args) mutable noexcept { LOG(3) << "Coordinator shard got response " << args.response.data << " for " << commandObj << " to " << shardId; auto status = args.response.status; // Only consider actual failures to send the command as errors. if (status.isOK()) { - sharedPromise->emplaceValue(args.response); + promise->emplaceValue(std::move(args.response)); } else { - sharedPromise->setError(status); + promise->setError([&] { + if (status == ErrorCodes::CallbackCanceled) { + stdx::unique_lock<stdx::mutex> ul(_mutex); + return _shutdownStatus.isOK() ? status : _shutdownStatus; + } + return status; + }()); } })); + + auto it = + _activeHandles.emplace(_activeHandles.begin(), std::move(scheduledCommandHandle)); + + ul.unlock(); + + return std::move(pf.future).tapAll( + [ this, it = std::move(it) ](StatusWith<ResponseStatus> s) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _activeHandles.erase(it); + }); }) - .onError([ shardId, commandObj = commandObj.getOwned(), sharedPromise ](Status s) { + .tapError([ shardId, commandObj = commandObj.getOwned() ](Status s) { LOG(3) << "Coordinator shard failed to target command " << commandObj << " to shard " << shardId << causedBy(s); + }); +} - sharedPromise->setError(s); - }) - .getAsync([](Status) {}); +std::unique_ptr<AsyncWorkScheduler> AsyncWorkScheduler::makeChildScheduler() { + auto child = stdx::make_unique<AsyncWorkScheduler>(_serviceContext); - // Do not wait for the callback to run. The continuation on the future corresponding to - // 'sharedPromise' will reschedule the remote request if necessary. - return std::move(promiseAndFuture.future); + stdx::lock_guard<stdx::mutex> lg(_mutex); + if (!_shutdownStatus.isOK()) + child->shutdown(_shutdownStatus); + + child->_parent = this; + child->_itToRemove = _childSchedulers.emplace(_childSchedulers.begin(), child.get()); + + return child; +} + +void AsyncWorkScheduler::shutdown(Status status) { + invariant(!status.isOK()); + + stdx::lock_guard<stdx::mutex> lg(_mutex); + if (!_shutdownStatus.isOK()) + return; + + _shutdownStatus = std::move(status); + + for (const auto& it : _activeOpContexts) { + stdx::lock_guard<Client> clientLock(*it->getClient()); + _serviceContext->killOperation(clientLock, it.get(), _shutdownStatus.code()); + } + + for (const auto& it : _activeHandles) { + _executor->cancel(it); + } + + for (auto& child : _childSchedulers) { + child->shutdown(_shutdownStatus); + } } Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId, diff --git a/src/mongo/db/transaction_coordinator_futures_util.h b/src/mongo/db/transaction_coordinator_futures_util.h index 5d677290da1..a30112373b4 100644 --- a/src/mongo/db/transaction_coordinator_futures_util.h +++ b/src/mongo/db/transaction_coordinator_futures_util.h @@ -51,6 +51,10 @@ public: AsyncWorkScheduler(ServiceContext* serviceContext); ~AsyncWorkScheduler(); + /** + * Schedules the specified callable to execute asynchronously and returns a future which will be + * set with its result. + */ template <class Callable> Future<FutureContinuationResult<Callable, OperationContext*>> scheduleWork( Callable&& task) noexcept { @@ -70,19 +74,45 @@ public: auto pf = makePromiseFuture<ReturnType>(); auto taskCompletionPromise = std::make_shared<Promise<ReturnType>>(std::move(pf.promise)); try { - uassertStatusOK(_executor->scheduleWorkAt( + stdx::unique_lock<stdx::mutex> ul(_mutex); + uassertStatusOK(_shutdownStatus); + + auto scheduledWorkHandle = uassertStatusOK(_executor->scheduleWorkAt( when, [ this, task = std::forward<Callable>(task), taskCompletionPromise ]( const executor::TaskExecutor::CallbackArgs&) mutable noexcept { - ThreadClient tc("TransactionCoordinator", _serviceContext); - auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); - taskCompletionPromise->setWith([&] { return task(uniqueOpCtx.get()); }); + taskCompletionPromise->setWith([&] { + ThreadClient tc("TransactionCoordinator", _serviceContext); + stdx::unique_lock<stdx::mutex> ul(_mutex); + uassertStatusOK(_shutdownStatus); + + auto uniqueOpCtxIter = _activeOpContexts.emplace( + _activeOpContexts.begin(), tc->makeOperationContext()); + ul.unlock(); + + auto scopedGuard = makeGuard([&] { + ul.lock(); + _activeOpContexts.erase(uniqueOpCtxIter); + }); + + return task(uniqueOpCtxIter->get()); + }); })); + + auto it = + _activeHandles.emplace(_activeHandles.begin(), std::move(scheduledWorkHandle)); + + ul.unlock(); + + return std::move(pf.future).tapAll( + [ this, it = std::move(it) ](StatusOrStatusWith<ReturnType> s) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _activeHandles.erase(it); + }); } catch (const DBException& ex) { taskCompletionPromise->setError(ex.toStatus()); + return std::move(pf.future); } - - return std::move(pf.future); } /** @@ -92,7 +122,29 @@ public: Future<executor::TaskExecutor::ResponseStatus> scheduleRemoteCommand( const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj); + /** + * Allows sub-tasks on this scheduler to be grouped together and works-around the fact that + * futures are not cancellable. + * + * Shutting down the returned child scheduler has no effect on the parent. Shutting down the + * parent scheduler also shuts down all child schedulers and prevents new ones from starting. + */ + std::unique_ptr<AsyncWorkScheduler> makeChildScheduler(); + + /** + * Non-blocking method, which interrupts all currently active scheduled commands or tasks and + * prevents any new ones from starting. + * After this method is called, all returned futures, which haven't yet been signalled will be + * set to the specified status. Attempting to schedule any new operations will return ready + * futures set to the specified status. + * + * Must not be called with Status::OK. + */ + void shutdown(Status status); + private: + using ChildIteratorsList = std::list<AsyncWorkScheduler*>; + /** * Finds the host and port for a shard. */ @@ -104,6 +156,27 @@ private: // Cached reference to the executor to use executor::TaskExecutor* const _executor; + + // If this work scheduler was constructed through 'makeChildScheduler', points to the parent + // scheduler and contains the iterator from the parent, which needs to be removed on destruction + AsyncWorkScheduler* _parent{nullptr}; + ChildIteratorsList::iterator _itToRemove; + + // Mutex to protect the shared state below + stdx::mutex _mutex; + + // If shutdown() is called, this contains the first status that was passed to it and is an + // indication that no more operations can be scheduled + Status _shutdownStatus{Status::OK()}; + + // Any active scheduled work will have its operation context stored here + std::list<ServiceContext::UniqueOperationContext> _activeOpContexts; + + // Any active scheduled work or network operation will have its TaskExecutor handle stored here + std::list<executor::TaskExecutor::CallbackHandle> _activeHandles; + + // Any outstanding child schedulers created though 'makeChildScheduler' + ChildIteratorsList _childSchedulers; }; enum class ShouldStopIteration { kYes, kNo }; diff --git a/src/mongo/db/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/transaction_coordinator_futures_util_test.cpp index ecb4ed5ba66..3ce434d25ed 100644 --- a/src/mongo/db/transaction_coordinator_futures_util_test.cpp +++ b/src/mongo/db/transaction_coordinator_futures_util_test.cpp @@ -41,6 +41,8 @@ namespace mongo { namespace txn { namespace { +using Barrier = unittest::Barrier; + TEST(TransactionCoordinatorFuturesUtilTest, CollectReturnsInitValueWhenInputIsEmptyVector) { std::vector<Future<int>> futures; auto resultFuture = txn::collect(std::move(futures), 0, [](int& result, const int& next) { @@ -167,7 +169,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkSucceeds) { ASSERT(!future.isReady()); pf.promise.emplaceValue(5); - ASSERT_EQ(5, future.get(operationContext())); + ASSERT_EQ(5, future.get()); } TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkThrowsException) { @@ -186,8 +188,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkThrowsException) { ASSERT(!future.isReady()); pf.promise.emplaceValue(5); - ASSERT_THROWS_CODE( - future.get(operationContext()), AssertionException, ErrorCodes::InternalError); + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::InternalError); } TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkInSucceeds) { @@ -213,7 +214,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkInSucceeds) { ASSERT(future.isReady()); } - ASSERT_EQ(5, future.get(operationContext())); + ASSERT_EQ(5, future.get()); } TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsOK) { @@ -229,7 +230,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsOK) { return objResponse; }); - const auto& response = future.get(operationContext()); + const auto& response = future.get(); ASSERT(response.isOK()); ASSERT_BSONOBJ_EQ(objResponse, response.data); } @@ -247,7 +248,7 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsNotOK) { return objResponse; }); - const auto& response = future.get(operationContext()); + const auto& response = future.get(); ASSERT(response.isOK()); ASSERT_BSONOBJ_EQ(objResponse, response.data); } @@ -270,13 +271,159 @@ TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandsOneOKAndOneError) { return BSON("ok" << 0 << "responseData" << 3); }); - const auto& response2 = future2.get(operationContext()); + const auto& response2 = future2.get(); ASSERT(response2.isOK()); - const auto& response1 = future1.get(operationContext()); + const auto& response1 = future1.get(); ASSERT(response1.isOK()); } +TEST_F(AsyncWorkSchedulerTest, ShutdownInterruptsRunningBlockedTasks) { + AsyncWorkScheduler async(getServiceContext()); + + Barrier barrier(2); + + auto future = async.scheduleWork([&barrier](OperationContext* opCtx) { + barrier.countDownAndWait(); + opCtx->sleepFor(Hours(6)); + }); + + barrier.countDownAndWait(); + ASSERT(!future.isReady()); + + async.shutdown({ErrorCodes::InternalError, "Test internal error"}); + + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::InternalError); +} + +TEST_F(AsyncWorkSchedulerTest, ShutdownInterruptsNotYetScheduledTasks) { + AsyncWorkScheduler async(getServiceContext()); + + AtomicWord<int> numInvocations{0}; + + auto future1 = + async.scheduleWorkIn(Milliseconds(1), [&numInvocations](OperationContext* opCtx) { + numInvocations.addAndFetch(1); + }); + + auto future2 = + async.scheduleWorkIn(Milliseconds(1), [&numInvocations](OperationContext* opCtx) { + numInvocations.addAndFetch(1); + }); + + ASSERT(!future1.isReady()); + ASSERT(!future2.isReady()); + ASSERT_EQ(0, numInvocations.load()); + + async.shutdown({ErrorCodes::InternalError, "Test internal error"}); + ASSERT_EQ(0, numInvocations.load()); + + ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InternalError); + ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InternalError); + + ASSERT_EQ(0, numInvocations.load()); +} + +TEST_F(AsyncWorkSchedulerTest, ShutdownInterruptsRemoteCommandsWhichAreBlockedWaitingForResponse) { + AsyncWorkScheduler async(getServiceContext()); + + auto future1 = async.scheduleRemoteCommand( + kShardIds[1], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 1)); + + auto future2 = async.scheduleRemoteCommand( + kShardIds[2], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 1)); + + // Wait till at least one of the two commands above gets scheduled + network()->waitForWork(); + + ASSERT(!future1.isReady()); + ASSERT(!future2.isReady()); + + async.shutdown({ErrorCodes::InternalError, "Test internal error"}); + + ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InternalError); + ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InternalError); +} + +TEST_F(AsyncWorkSchedulerTest, ShutdownChildSchedulerOnlyInterruptsChildTasks) { + AsyncWorkScheduler async(getServiceContext()); + + auto futureFromParent = async.scheduleWorkIn( + Milliseconds(1), [](OperationContext* opCtx) { return std::string("Parent"); }); + + auto childAsync1 = async.makeChildScheduler(); + auto childFuture1 = childAsync1->scheduleWorkIn( + Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child1"); }); + + auto childAsync2 = async.makeChildScheduler(); + auto childFuture2 = childAsync2->scheduleWorkIn( + Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child2"); }); + + childAsync1->shutdown({ErrorCodes::InternalError, "Test error"}); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->advanceTime(network()->now() + Milliseconds(1)); + } + + ASSERT_EQ("Parent", futureFromParent.get()); + ASSERT_THROWS_CODE(childFuture1.get(), AssertionException, ErrorCodes::InternalError); + ASSERT_EQ("Child2", childFuture2.get()); +} + +TEST_F(AsyncWorkSchedulerTest, ShutdownParentSchedulerInterruptsAllChildTasks) { + AsyncWorkScheduler async(getServiceContext()); + + auto futureFromParent = async.scheduleWorkIn( + Milliseconds(1), [](OperationContext* opCtx) { return std::string("Parent"); }); + + auto childAsync1 = async.makeChildScheduler(); + auto childFuture1 = childAsync1->scheduleWorkIn( + Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child1"); }); + + auto childAsync2 = async.makeChildScheduler(); + auto childFuture2 = childAsync2->scheduleWorkIn( + Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child2"); }); + + async.shutdown({ErrorCodes::InternalError, "Test error"}); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->advanceTime(network()->now() + Milliseconds(1)); + } + + ASSERT_THROWS_CODE(futureFromParent.get(), AssertionException, ErrorCodes::InternalError); + ASSERT_THROWS_CODE(childFuture1.get(), AssertionException, ErrorCodes::InternalError); + ASSERT_THROWS_CODE(childFuture2.get(), AssertionException, ErrorCodes::InternalError); +} + +TEST_F(AsyncWorkSchedulerTest, MakeChildSchedulerAfterShutdownParentScheduler) { + AsyncWorkScheduler async(getServiceContext()); + + // Shut down the parent scheduler immediately + async.shutdown({ErrorCodes::InternalError, "Test error"}); + + auto futureFromParent = async.scheduleWorkIn( + Milliseconds(1), [](OperationContext* opCtx) { return std::string("Parent"); }); + + auto childAsync1 = async.makeChildScheduler(); + auto childFuture1 = childAsync1->scheduleWorkIn( + Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child1"); }); + + auto childAsync2 = async.makeChildScheduler(); + auto childFuture2 = childAsync2->scheduleWorkIn( + Milliseconds(1), [](OperationContext* opCtx) { return std::string("Child2"); }); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->advanceTime(network()->now() + Milliseconds(1)); + } + + ASSERT_THROWS_CODE(futureFromParent.get(), AssertionException, ErrorCodes::InternalError); + ASSERT_THROWS_CODE(childFuture1.get(), AssertionException, ErrorCodes::InternalError); + ASSERT_THROWS_CODE(childFuture2.get(), AssertionException, ErrorCodes::InternalError); +} + using DoWhileTest = AsyncWorkSchedulerTest; @@ -294,7 +441,7 @@ TEST_F(DoWhileTest, LoopBodyExecutesAtLeastOnceWithBackoff) { ASSERT(future.isReady()); ASSERT_EQ(1, numLoops); - ASSERT_EQ(1, future.get(operationContext())); + ASSERT_EQ(1, future.get()); } TEST_F(DoWhileTest, LoopBodyExecutesManyIterationsWithoutBackoff) { @@ -309,7 +456,7 @@ TEST_F(DoWhileTest, LoopBodyExecutesManyIterationsWithoutBackoff) { }, [&remainingLoops] { return Future<int>::makeReady(--remainingLoops); }); - ASSERT_EQ(0, future.get(operationContext())); + ASSERT_EQ(0, future.get()); ASSERT_EQ(0, remainingLoops); } @@ -350,7 +497,27 @@ TEST_F(DoWhileTest, LoopObeysBackoff) { ASSERT_EQ(3, numLoops); } - ASSERT_EQ(3, future.get(operationContext())); + ASSERT_EQ(3, future.get()); +} + +TEST_F(DoWhileTest, LoopObeysShutdown) { + AsyncWorkScheduler async(getServiceContext()); + + int numLoops = 0; + auto future = + doWhile(async, + boost::none, + [](const StatusWith<int>& status) { return status != ErrorCodes::InternalError; }, + [&numLoops] { return Future<int>::makeReady(++numLoops); }); + + // Wait for at least one loop + while (numLoops == 0) + sleepFor(Milliseconds(25)); + + ASSERT(!future.isReady()); + async.shutdown({ErrorCodes::InternalError, "Test internal error"}); + + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::InternalError); } } // namespace diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp index 5a1becbe5a3..27e1fba797a 100644 --- a/src/mongo/db/transaction_coordinator_service.cpp +++ b/src/mongo/db/transaction_coordinator_service.cpp @@ -34,16 +34,12 @@ #include "mongo/db/transaction_coordinator_service.h" -#include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/service_context.h" #include "mongo/db/transaction_coordinator_document_gen.h" #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/grid.h" -#include "mongo/s/shard_id.h" #include "mongo/util/log.h" namespace mongo { @@ -57,6 +53,8 @@ const auto transactionCoordinatorServiceDecoration = TransactionCoordinatorService::TransactionCoordinatorService() : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {} +TransactionCoordinatorService::~TransactionCoordinatorService() = default; + TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) { return get(opCtx->getServiceContext()); } @@ -209,6 +207,6 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) { invariant(status); } -void TransactionCoordinatorService::onStepDown(OperationContext* opCtx) {} +void TransactionCoordinatorService::onStepDown() {} } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h index 89971179010..115c7604bcc 100644 --- a/src/mongo/db/transaction_coordinator_service.h +++ b/src/mongo/db/transaction_coordinator_service.h @@ -30,25 +30,17 @@ #pragma once -#include <memory> - #include "mongo/base/disallow_copying.h" -#include "mongo/db/logical_session_id.h" #include "mongo/db/transaction_coordinator_catalog.h" -#include "mongo/util/future.h" namespace mongo { -class ShardId; -class OperationContext; -class ServiceContext; - -class TransactionCoordinatorService final { +class TransactionCoordinatorService { MONGO_DISALLOW_COPYING(TransactionCoordinatorService); public: TransactionCoordinatorService(); - ~TransactionCoordinatorService() = default; + ~TransactionCoordinatorService(); /** * Retrieves the TransactionCoordinatorService associated with the service or operation context. @@ -100,7 +92,7 @@ public: * async task to continue coordinating its commit. */ void onStepUp(OperationContext* opCtx); - void onStepDown(OperationContext* opCtx); + void onStepDown(); private: std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog; diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp index 28c0bcb9780..7b8020f8622 100644 --- a/src/mongo/db/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/transaction_coordinator_service_test.cpp @@ -42,7 +42,6 @@ namespace mongo { namespace { -const Timestamp kDummyTimestamp = Timestamp::min(); const Date_t kCommitDeadline = Date_t::max(); const BSONObj kDummyWriteConcernError = BSON("code" << ErrorCodes::WriteConcernFailed << "errmsg" @@ -198,24 +197,6 @@ public: TxnNumber _txnNumber{1}; }; -/** - * Fixture that during setUp automatically creates a coordinator for a default lsid/txnNumber pair. - */ -class TransactionCoordinatorServiceTestSingleTxn : public TransactionCoordinatorServiceTest { -public: - void setUp() final { - TransactionCoordinatorServiceTest::setUp(); - TransactionCoordinatorService::get(operationContext()) - ->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline); - } - - TransactionCoordinatorService* coordinatorService() { - return TransactionCoordinatorService::get(operationContext()); - } -}; - -} // namespace - TEST_F(TransactionCoordinatorServiceTest, CreateCoordinatorOnNewSessionSucceeds) { auto coordinatorService = TransactionCoordinatorService::get(operationContext()); coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline); @@ -553,6 +534,23 @@ TEST_F(TransactionCoordinatorServiceTest, operationContext(), _lsid, _txnNumber, kTwoShardIdSet)); } + +/** + * Fixture that during setUp automatically creates a coordinator for a default lsid/txnNumber pair. + */ +class TransactionCoordinatorServiceTestSingleTxn : public TransactionCoordinatorServiceTest { +public: + void setUp() final { + TransactionCoordinatorServiceTest::setUp(); + TransactionCoordinatorService::get(operationContext()) + ->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline); + } + + TransactionCoordinatorService* coordinatorService() { + return TransactionCoordinatorService::get(operationContext()); + } +}; + TEST_F(TransactionCoordinatorServiceTestSingleTxn, CoordinateCommitReturnsCorrectCommitDecisionOnAbort) { @@ -624,4 +622,5 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, static_cast<int>(commitDecisionFuture2.get())); } +} // namespace } // namespace mongo |