diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-03-21 13:14:40 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-03-21 21:36:47 -0400 |
commit | be36aeb7166b2e06dd47dd0769ea28cbb7250041 (patch) | |
tree | d95db70de0f2ac6cf1d6bc62ef85c657c6cff2f5 /src/mongo/db | |
parent | 78eaa3cf538764d5aa5a09c5997532a4c3b2bca8 (diff) | |
download | mongo-be36aeb7166b2e06dd47dd0769ea28cbb7250041.tar.gz |
SERVER-40223 Use the AsyncWorkScheduler to run local command when recovering a coordinator decision
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_driver.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_driver.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_futures_util.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_futures_util.h | 11 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/s/txn_two_phase_commit_cmds.cpp | 65 |
7 files changed, 90 insertions, 100 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 6e7d4d377ed..5ba843566a6 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -79,7 +79,7 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, _lsid(lsid), _txnNumber(txnNumber), _scheduler(std::move(scheduler)), - _driver(serviceContext, _scheduler->makeChildScheduler()) { + _driver(serviceContext, *_scheduler) { if (coordinateCommitDeadline) { _deadlineScheduler = _scheduler->makeChildScheduler(); _deadlineScheduler @@ -93,10 +93,11 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, } TransactionCoordinator::~TransactionCoordinator() { - _cancelTimeoutWaitForCommitTask(); + cancelIfCommitNotYetStarted(); + // Wait for all scheduled asynchronous activity to complete if (_deadlineScheduler) - _deadlineScheduler.reset(); + _deadlineScheduler->join(); stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_state == TransactionCoordinator::CoordinatorState::kDone); @@ -231,13 +232,11 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants( invariant(_state == CoordinatorState::kPreparing); _decisionPromise.emplaceValue(decision.decision); - // Send the decision to all participants. switch (decision.decision) { case txn::CommitDecision::kCommit: _state = CoordinatorState::kCommitting; - invariant(decision.commitTimestamp); return _driver.sendCommit( - participantShards, _lsid, _txnNumber, decision.commitTimestamp.get()); + participantShards, _lsid, _txnNumber, *decision.commitTimestamp); case txn::CommitDecision::kAbort: _state = CoordinatorState::kAborting; return _driver.sendAbort(participantShards, _lsid, _txnNumber); diff --git a/src/mongo/db/s/transaction_coordinator_driver.cpp b/src/mongo/db/s/transaction_coordinator_driver.cpp index f068c10d5cf..177a3c0a5a3 100644 --- a/src/mongo/db/s/transaction_coordinator_driver.cpp +++ b/src/mongo/db/s/transaction_coordinator_driver.cpp @@ -110,16 +110,6 @@ std::string buildParticipantListString(const std::vector<ShardId>& participantLi return ss.str(); } -bool checkIsLocalShard(ServiceContext* serviceContext, const ShardId& shardId) { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - return shardId == ShardRegistry::kConfigServerShardId; - } - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - return shardId == ShardingState::get(serviceContext)->shardId(); - } - MONGO_UNREACHABLE; // Only sharded systems should use the two-phase commit path. -} - bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) { // Writes to the local node are expected to succeed if this node is still primary, so *only* // retry if the write was explicitly interrupted (we do not allow a user to stop a commit @@ -131,11 +121,9 @@ bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) { } // namespace -TransactionCoordinatorDriver::TransactionCoordinatorDriver( - ServiceContext* serviceContext, std::unique_ptr<txn::AsyncWorkScheduler> scheduler) - : _serviceContext(serviceContext), _scheduler(std::move(scheduler)) {} - -TransactionCoordinatorDriver::~TransactionCoordinatorDriver() = default; +TransactionCoordinatorDriver::TransactionCoordinatorDriver(ServiceContext* serviceContext, + txn::AsyncWorkScheduler& scheduler) + : _serviceContext(serviceContext), _scheduler(scheduler) {} namespace { void persistParticipantListBlocking(OperationContext* opCtx, @@ -228,11 +216,11 @@ void persistParticipantListBlocking(OperationContext* opCtx, Future<void> TransactionCoordinatorDriver::persistParticipantList( const LogicalSessionId& lsid, TxnNumber txnNumber, std::vector<ShardId> participantList) { - return txn::doWhile(*_scheduler, + return txn::doWhile(_scheduler, boost::none /* no need for a backoff */, [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); }, [this, lsid, txnNumber, participantList] { - return _scheduler->scheduleWork( + return _scheduler.scheduleWork( [lsid, txnNumber, participantList](OperationContext* opCtx) { persistParticipantListBlocking( opCtx, lsid, txnNumber, participantList); @@ -255,7 +243,7 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( // Send prepare to all participants asynchronously and collect their future responses in a // vector of responses. - auto prepareScheduler = _scheduler->makeChildScheduler(); + auto prepareScheduler = _scheduler.makeChildScheduler(); for (const auto& participant : participantShards) { responses.push_back(sendPrepareToShard(*prepareScheduler, participant, prepareObj)); @@ -419,11 +407,11 @@ Future<void> TransactionCoordinatorDriver::persistDecision( std::vector<ShardId> participantList, const boost::optional<Timestamp>& commitTimestamp) { return txn::doWhile( - *_scheduler, + _scheduler, boost::none /* no need for a backoff */, [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); }, [this, lsid, txnNumber, participantList, commitTimestamp] { - return _scheduler->scheduleWork([lsid, txnNumber, participantList, commitTimestamp]( + return _scheduler.scheduleWork([lsid, txnNumber, participantList, commitTimestamp]( OperationContext* opCtx) { persistDecisionBlocking(opCtx, lsid, txnNumber, participantList, commitTimestamp); }); @@ -444,7 +432,7 @@ Future<void> TransactionCoordinatorDriver::sendCommit(const std::vector<ShardId> std::vector<Future<void>> responses; for (const auto& participant : participantShards) { - responses.push_back(sendDecisionToParticipantShard(*_scheduler, participant, commitObj)); + responses.push_back(sendDecisionToParticipantShard(_scheduler, participant, commitObj)); } return txn::whenAll(responses); } @@ -461,7 +449,7 @@ Future<void> TransactionCoordinatorDriver::sendAbort(const std::vector<ShardId>& std::vector<Future<void>> responses; for (const auto& participant : participantShards) { - responses.push_back(sendDecisionToParticipantShard(*_scheduler, participant, abortObj)); + responses.push_back(sendDecisionToParticipantShard(_scheduler, participant, abortObj)); } return txn::whenAll(responses); } @@ -531,11 +519,11 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSessionId& lsid, TxnNumber txnNumber) { - return txn::doWhile(*_scheduler, + return txn::doWhile(_scheduler, boost::none /* no need for a backoff */, [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); }, [this, lsid, txnNumber] { - return _scheduler->scheduleWork( + return _scheduler.scheduleWork( [lsid, txnNumber](OperationContext* opCtx) { deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber); }); @@ -563,7 +551,7 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard( txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) { - const bool isLocalShard = checkIsLocalShard(_serviceContext, shardId); + const bool isLocalShard = (shardId == txn::getLocalShardId(_serviceContext)); auto f = txn::doWhile( scheduler, @@ -645,7 +633,7 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard( Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard( txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) { - const bool isLocalShard = checkIsLocalShard(_serviceContext, shardId); + const bool isLocalShard = (shardId == txn::getLocalShardId(_serviceContext)); return txn::doWhile( scheduler, diff --git a/src/mongo/db/s/transaction_coordinator_driver.h b/src/mongo/db/s/transaction_coordinator_driver.h index fb80d6891a1..bc9bbb825af 100644 --- a/src/mongo/db/s/transaction_coordinator_driver.h +++ b/src/mongo/db/s/transaction_coordinator_driver.h @@ -92,8 +92,7 @@ class TransactionCoordinatorDriver { public: TransactionCoordinatorDriver(ServiceContext* serviceContext, - std::unique_ptr<txn::AsyncWorkScheduler> scheduler); - ~TransactionCoordinatorDriver(); + txn::AsyncWorkScheduler& scheduler); /** * Upserts a document of the form: @@ -232,7 +231,7 @@ public: private: ServiceContext* _serviceContext; - std::unique_ptr<txn::AsyncWorkScheduler> _scheduler; + txn::AsyncWorkScheduler& _scheduler; }; } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp index 7ca2b572067..fa74d11cbf0 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp @@ -58,12 +58,7 @@ AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext) _executor(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) {} AsyncWorkScheduler::~AsyncWorkScheduler() { - { - stdx::unique_lock<stdx::mutex> ul(_mutex); - _allListsEmptyCV.wait(ul, [&] { - return _activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty(); - }); - } + join(); if (!_parent) return; @@ -77,15 +72,7 @@ AsyncWorkScheduler::~AsyncWorkScheduler() { Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand( const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) { - bool isSelfShard = [this, shardId] { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - return shardId == ShardRegistry::kConfigServerShardId; - } - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - return shardId == ShardingState::get(_serviceContext)->shardId(); - } - MONGO_UNREACHABLE; // Only sharded systems should use the two-phase commit path. - }(); + const bool isSelfShard = (shardId == getLocalShardId(_serviceContext)); if (isSelfShard) { // If sending a command to the same shard as this node is in, send it directly to this node @@ -94,10 +81,10 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot // history. See SERVER-38142 for details. return scheduleWork([ this, shardId, commandObj = commandObj.getOwned() ](OperationContext * opCtx) { - // Note: This internal authorization is tied to the lifetime of 'opCtx', which is - // destroyed by 'scheduleWork' immediately after this lambda ends. - AuthorizationSession::get(Client::getCurrent()) - ->grantInternalAuthorization(Client::getCurrent()); + // Note: This internal authorization is tied to the lifetime of the client, which will + // be destroyed by 'scheduleWork' immediately after this lambda ends + AuthorizationSession::get(opCtx->getClient()) + ->grantInternalAuthorization(opCtx->getClient()); if (MONGO_FAIL_POINT(hangWhileTargetingLocalHost)) { LOG(0) << "Hit hangWhileTargetingLocalHost failpoint"; @@ -208,6 +195,13 @@ void AsyncWorkScheduler::shutdown(Status status) { } } +void AsyncWorkScheduler::join() { + stdx::unique_lock<stdx::mutex> ul(_mutex); + _allListsEmptyCV.wait(ul, [&] { + return _activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty(); + }); +} + Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId, const ReadPreferenceSetting& readPref) { return scheduleWork([shardId, readPref](OperationContext* opCtx) { @@ -229,6 +223,18 @@ void AsyncWorkScheduler::_notifyAllTasksComplete(WithLock) { _allListsEmptyCV.notify_all(); } +ShardId getLocalShardId(ServiceContext* service) { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return ShardRegistry::kConfigServerShardId; + } + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + return ShardingState::get(service)->shardId(); + } + + // Only sharded systems should use the two-phase commit path + MONGO_UNREACHABLE; +} + Future<void> whenAll(std::vector<Future<void>>& futures) { std::vector<Future<int>> dummyFutures; for (auto&& f : futures) { diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h index 942ee7e2fbf..7a302d5a2b4 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.h +++ b/src/mongo/db/s/transaction_coordinator_futures_util.h @@ -146,6 +146,15 @@ public: */ void shutdown(Status status); + /** + * Blocking call, which will wait until any scheduled commands/tasks and child schedulers have + * drained. + * + * It is allowed to be called without calling shutdown, but in that case it the caller's + * responsibility to ensure that no new work gets scheduled. + */ + void join(); + private: using ChildIteratorsList = std::list<AsyncWorkScheduler*>; @@ -194,6 +203,8 @@ private: stdx::condition_variable _allListsEmptyCV; }; +ShardId getLocalShardId(ServiceContext* service); + enum class ShouldStopIteration { kYes, kNo }; /** diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index b9c1db385f2..2551eeb2620 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -105,8 +105,8 @@ class TransactionCoordinatorDriverTest : public TransactionCoordinatorTestBase { protected: void setUp() override { TransactionCoordinatorTestBase::setUp(); - _driver.emplace(getServiceContext(), - std::make_unique<txn::AsyncWorkScheduler>(getServiceContext())); + _aws.emplace(getServiceContext()); + _driver.emplace(getServiceContext(), *_aws); } void tearDown() override { @@ -114,6 +114,7 @@ protected: TransactionCoordinatorTestBase::tearDown(); } + boost::optional<txn::AsyncWorkScheduler> _aws; boost::optional<TransactionCoordinatorDriver> _driver; }; @@ -820,5 +821,14 @@ TEST_F(TransactionCoordinatorTest, coordinator.onCompletion().get(); } +TEST_F(TransactionCoordinatorTest, AbandonNewlyCreatedCoordinator) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + network()->now() + Seconds{30}); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 6c76aa844fd..21e907f7e5d 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -31,18 +31,13 @@ #include "mongo/platform/basic.h" -#include "mongo/client/remote_command_targeter.h" -#include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/transaction_coordinator_service.h" #include "mongo/db/transaction_participant.h" -#include "mongo/executor/task_executor.h" -#include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/transport/service_entry_point.h" #include "mongo/util/log.h" namespace mongo { @@ -272,50 +267,32 @@ public: << "autocommit" << false); - BSONObj abortResponseObj; - - const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - auto cbHandle = uassertStatusOK(executor->scheduleWork([ - serviceContext = opCtx->getServiceContext(), - &abortResponseObj, - abortRequestObj = abortRequestObj.getOwned() - ](const executor::TaskExecutor::CallbackArgs& cbArgs) { - ThreadClient threadClient(serviceContext); - auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); - - AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx); - - auto requestOpMsg = - OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, abortRequestObj) - .serialize(); - const auto replyOpMsg = OpMsg::parseOwned(serviceContext->getServiceEntryPoint() - ->handleRequest(opCtx, requestOpMsg) - .response); - - invariant(replyOpMsg.sequences.empty()); - abortResponseObj = replyOpMsg.body.getOwned(); - })); - executor->wait(cbHandle, opCtx); - - const auto abortStatus = getStatusFromCommandResult(abortResponseObj); - - // Since the abortTransaction was sent without writeConcern, there should not be a - // writeConcern error. - invariant(getWriteConcernStatusFromCommandResult(abortResponseObj).isOK()); + const auto abortStatus = [&] { + txn::AsyncWorkScheduler aws(opCtx->getServiceContext()); + auto awsShutdownGuard = makeGuard([&aws] { + aws.shutdown({ErrorCodes::Interrupted, "Request interrupted due to timeout"}); + }); + auto future = + aws.scheduleRemoteCommand(txn::getLocalShardId(opCtx->getServiceContext()), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + abortRequestObj); + const auto& responseStatus = future.get(opCtx); + uassertStatusOK(responseStatus.status); + + return getStatusFromCommandResult(responseStatus.data); + }(); LOG(3) << "coordinateCommitTransaction got response " << abortStatus << " for " << abortRequestObj << " used to recover decision from local participant"; // If the abortTransaction succeeded, return that the transaction aborted. - uassert(ErrorCodes::NoSuchTransaction, "transaction aborted", !abortStatus.isOK()); - - // If the abortTransaction returned that the transaction committed, return - // ok, otherwise return whatever the abortTransaction errored with (which may be - // NoSuchTransaction). - uassert(abortStatus.code(), - abortStatus.reason(), - abortStatus.code() == ErrorCodes::TransactionCommitted); + if (abortStatus.isOK()) + uasserted(ErrorCodes::NoSuchTransaction, "Transaction aborted"); + + // If the abortTransaction returned that the transaction committed, return OK, otherwise + // return whatever the abortTransaction errored with (which may be NoSuchTransaction). + if (abortStatus != ErrorCodes::TransactionCommitted) + uassertStatusOK(abortStatus); } private: |