summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-03-21 13:14:40 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-03-21 21:36:47 -0400
commitbe36aeb7166b2e06dd47dd0769ea28cbb7250041 (patch)
treed95db70de0f2ac6cf1d6bc62ef85c657c6cff2f5 /src/mongo/db
parent78eaa3cf538764d5aa5a09c5997532a4c3b2bca8 (diff)
downloadmongo-be36aeb7166b2e06dd47dd0769ea28cbb7250041.tar.gz
SERVER-40223 Use the AsyncWorkScheduler to run local command when recovering a coordinator decision
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp11
-rw-r--r--src/mongo/db/s/transaction_coordinator_driver.cpp40
-rw-r--r--src/mongo/db/s/transaction_coordinator_driver.h5
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.cpp44
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.h11
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp14
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp65
7 files changed, 90 insertions, 100 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 6e7d4d377ed..5ba843566a6 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -79,7 +79,7 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
_lsid(lsid),
_txnNumber(txnNumber),
_scheduler(std::move(scheduler)),
- _driver(serviceContext, _scheduler->makeChildScheduler()) {
+ _driver(serviceContext, *_scheduler) {
if (coordinateCommitDeadline) {
_deadlineScheduler = _scheduler->makeChildScheduler();
_deadlineScheduler
@@ -93,10 +93,11 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
}
TransactionCoordinator::~TransactionCoordinator() {
- _cancelTimeoutWaitForCommitTask();
+ cancelIfCommitNotYetStarted();
+ // Wait for all scheduled asynchronous activity to complete
if (_deadlineScheduler)
- _deadlineScheduler.reset();
+ _deadlineScheduler->join();
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_state == TransactionCoordinator::CoordinatorState::kDone);
@@ -231,13 +232,11 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants(
invariant(_state == CoordinatorState::kPreparing);
_decisionPromise.emplaceValue(decision.decision);
- // Send the decision to all participants.
switch (decision.decision) {
case txn::CommitDecision::kCommit:
_state = CoordinatorState::kCommitting;
- invariant(decision.commitTimestamp);
return _driver.sendCommit(
- participantShards, _lsid, _txnNumber, decision.commitTimestamp.get());
+ participantShards, _lsid, _txnNumber, *decision.commitTimestamp);
case txn::CommitDecision::kAbort:
_state = CoordinatorState::kAborting;
return _driver.sendAbort(participantShards, _lsid, _txnNumber);
diff --git a/src/mongo/db/s/transaction_coordinator_driver.cpp b/src/mongo/db/s/transaction_coordinator_driver.cpp
index f068c10d5cf..177a3c0a5a3 100644
--- a/src/mongo/db/s/transaction_coordinator_driver.cpp
+++ b/src/mongo/db/s/transaction_coordinator_driver.cpp
@@ -110,16 +110,6 @@ std::string buildParticipantListString(const std::vector<ShardId>& participantLi
return ss.str();
}
-bool checkIsLocalShard(ServiceContext* serviceContext, const ShardId& shardId) {
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- return shardId == ShardRegistry::kConfigServerShardId;
- }
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- return shardId == ShardingState::get(serviceContext)->shardId();
- }
- MONGO_UNREACHABLE; // Only sharded systems should use the two-phase commit path.
-}
-
bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) {
// Writes to the local node are expected to succeed if this node is still primary, so *only*
// retry if the write was explicitly interrupted (we do not allow a user to stop a commit
@@ -131,11 +121,9 @@ bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) {
} // namespace
-TransactionCoordinatorDriver::TransactionCoordinatorDriver(
- ServiceContext* serviceContext, std::unique_ptr<txn::AsyncWorkScheduler> scheduler)
- : _serviceContext(serviceContext), _scheduler(std::move(scheduler)) {}
-
-TransactionCoordinatorDriver::~TransactionCoordinatorDriver() = default;
+TransactionCoordinatorDriver::TransactionCoordinatorDriver(ServiceContext* serviceContext,
+ txn::AsyncWorkScheduler& scheduler)
+ : _serviceContext(serviceContext), _scheduler(scheduler) {}
namespace {
void persistParticipantListBlocking(OperationContext* opCtx,
@@ -228,11 +216,11 @@ void persistParticipantListBlocking(OperationContext* opCtx,
Future<void> TransactionCoordinatorDriver::persistParticipantList(
const LogicalSessionId& lsid, TxnNumber txnNumber, std::vector<ShardId> participantList) {
- return txn::doWhile(*_scheduler,
+ return txn::doWhile(_scheduler,
boost::none /* no need for a backoff */,
[](const Status& s) { return shouldRetryPersistingCoordinatorState(s); },
[this, lsid, txnNumber, participantList] {
- return _scheduler->scheduleWork(
+ return _scheduler.scheduleWork(
[lsid, txnNumber, participantList](OperationContext* opCtx) {
persistParticipantListBlocking(
opCtx, lsid, txnNumber, participantList);
@@ -255,7 +243,7 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
// Send prepare to all participants asynchronously and collect their future responses in a
// vector of responses.
- auto prepareScheduler = _scheduler->makeChildScheduler();
+ auto prepareScheduler = _scheduler.makeChildScheduler();
for (const auto& participant : participantShards) {
responses.push_back(sendPrepareToShard(*prepareScheduler, participant, prepareObj));
@@ -419,11 +407,11 @@ Future<void> TransactionCoordinatorDriver::persistDecision(
std::vector<ShardId> participantList,
const boost::optional<Timestamp>& commitTimestamp) {
return txn::doWhile(
- *_scheduler,
+ _scheduler,
boost::none /* no need for a backoff */,
[](const Status& s) { return shouldRetryPersistingCoordinatorState(s); },
[this, lsid, txnNumber, participantList, commitTimestamp] {
- return _scheduler->scheduleWork([lsid, txnNumber, participantList, commitTimestamp](
+ return _scheduler.scheduleWork([lsid, txnNumber, participantList, commitTimestamp](
OperationContext* opCtx) {
persistDecisionBlocking(opCtx, lsid, txnNumber, participantList, commitTimestamp);
});
@@ -444,7 +432,7 @@ Future<void> TransactionCoordinatorDriver::sendCommit(const std::vector<ShardId>
std::vector<Future<void>> responses;
for (const auto& participant : participantShards) {
- responses.push_back(sendDecisionToParticipantShard(*_scheduler, participant, commitObj));
+ responses.push_back(sendDecisionToParticipantShard(_scheduler, participant, commitObj));
}
return txn::whenAll(responses);
}
@@ -461,7 +449,7 @@ Future<void> TransactionCoordinatorDriver::sendAbort(const std::vector<ShardId>&
std::vector<Future<void>> responses;
for (const auto& participant : participantShards) {
- responses.push_back(sendDecisionToParticipantShard(*_scheduler, participant, abortObj));
+ responses.push_back(sendDecisionToParticipantShard(_scheduler, participant, abortObj));
}
return txn::whenAll(responses);
}
@@ -531,11 +519,11 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSessionId& lsid,
TxnNumber txnNumber) {
- return txn::doWhile(*_scheduler,
+ return txn::doWhile(_scheduler,
boost::none /* no need for a backoff */,
[](const Status& s) { return shouldRetryPersistingCoordinatorState(s); },
[this, lsid, txnNumber] {
- return _scheduler->scheduleWork(
+ return _scheduler.scheduleWork(
[lsid, txnNumber](OperationContext* opCtx) {
deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber);
});
@@ -563,7 +551,7 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl
Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) {
- const bool isLocalShard = checkIsLocalShard(_serviceContext, shardId);
+ const bool isLocalShard = (shardId == txn::getLocalShardId(_serviceContext));
auto f = txn::doWhile(
scheduler,
@@ -645,7 +633,7 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard(
txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) {
- const bool isLocalShard = checkIsLocalShard(_serviceContext, shardId);
+ const bool isLocalShard = (shardId == txn::getLocalShardId(_serviceContext));
return txn::doWhile(
scheduler,
diff --git a/src/mongo/db/s/transaction_coordinator_driver.h b/src/mongo/db/s/transaction_coordinator_driver.h
index fb80d6891a1..bc9bbb825af 100644
--- a/src/mongo/db/s/transaction_coordinator_driver.h
+++ b/src/mongo/db/s/transaction_coordinator_driver.h
@@ -92,8 +92,7 @@ class TransactionCoordinatorDriver {
public:
TransactionCoordinatorDriver(ServiceContext* serviceContext,
- std::unique_ptr<txn::AsyncWorkScheduler> scheduler);
- ~TransactionCoordinatorDriver();
+ txn::AsyncWorkScheduler& scheduler);
/**
* Upserts a document of the form:
@@ -232,7 +231,7 @@ public:
private:
ServiceContext* _serviceContext;
- std::unique_ptr<txn::AsyncWorkScheduler> _scheduler;
+ txn::AsyncWorkScheduler& _scheduler;
};
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
index 7ca2b572067..fa74d11cbf0 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
@@ -58,12 +58,7 @@ AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext)
_executor(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) {}
AsyncWorkScheduler::~AsyncWorkScheduler() {
- {
- stdx::unique_lock<stdx::mutex> ul(_mutex);
- _allListsEmptyCV.wait(ul, [&] {
- return _activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty();
- });
- }
+ join();
if (!_parent)
return;
@@ -77,15 +72,7 @@ AsyncWorkScheduler::~AsyncWorkScheduler() {
Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand(
const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) {
- bool isSelfShard = [this, shardId] {
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- return shardId == ShardRegistry::kConfigServerShardId;
- }
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- return shardId == ShardingState::get(_serviceContext)->shardId();
- }
- MONGO_UNREACHABLE; // Only sharded systems should use the two-phase commit path.
- }();
+ const bool isSelfShard = (shardId == getLocalShardId(_serviceContext));
if (isSelfShard) {
// If sending a command to the same shard as this node is in, send it directly to this node
@@ -94,10 +81,10 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot
// history. See SERVER-38142 for details.
return scheduleWork([ this, shardId, commandObj = commandObj.getOwned() ](OperationContext *
opCtx) {
- // Note: This internal authorization is tied to the lifetime of 'opCtx', which is
- // destroyed by 'scheduleWork' immediately after this lambda ends.
- AuthorizationSession::get(Client::getCurrent())
- ->grantInternalAuthorization(Client::getCurrent());
+ // Note: This internal authorization is tied to the lifetime of the client, which will
+ // be destroyed by 'scheduleWork' immediately after this lambda ends
+ AuthorizationSession::get(opCtx->getClient())
+ ->grantInternalAuthorization(opCtx->getClient());
if (MONGO_FAIL_POINT(hangWhileTargetingLocalHost)) {
LOG(0) << "Hit hangWhileTargetingLocalHost failpoint";
@@ -208,6 +195,13 @@ void AsyncWorkScheduler::shutdown(Status status) {
}
}
+void AsyncWorkScheduler::join() {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ _allListsEmptyCV.wait(ul, [&] {
+ return _activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty();
+ });
+}
+
Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId,
const ReadPreferenceSetting& readPref) {
return scheduleWork([shardId, readPref](OperationContext* opCtx) {
@@ -229,6 +223,18 @@ void AsyncWorkScheduler::_notifyAllTasksComplete(WithLock) {
_allListsEmptyCV.notify_all();
}
+ShardId getLocalShardId(ServiceContext* service) {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return ShardRegistry::kConfigServerShardId;
+ }
+ if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
+ return ShardingState::get(service)->shardId();
+ }
+
+ // Only sharded systems should use the two-phase commit path
+ MONGO_UNREACHABLE;
+}
+
Future<void> whenAll(std::vector<Future<void>>& futures) {
std::vector<Future<int>> dummyFutures;
for (auto&& f : futures) {
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h
index 942ee7e2fbf..7a302d5a2b4 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.h
@@ -146,6 +146,15 @@ public:
*/
void shutdown(Status status);
+ /**
+ * Blocking call, which will wait until any scheduled commands/tasks and child schedulers have
+ * drained.
+ *
+ * It is allowed to be called without calling shutdown, but in that case it the caller's
+ * responsibility to ensure that no new work gets scheduled.
+ */
+ void join();
+
private:
using ChildIteratorsList = std::list<AsyncWorkScheduler*>;
@@ -194,6 +203,8 @@ private:
stdx::condition_variable _allListsEmptyCV;
};
+ShardId getLocalShardId(ServiceContext* service);
+
enum class ShouldStopIteration { kYes, kNo };
/**
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index b9c1db385f2..2551eeb2620 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -105,8 +105,8 @@ class TransactionCoordinatorDriverTest : public TransactionCoordinatorTestBase {
protected:
void setUp() override {
TransactionCoordinatorTestBase::setUp();
- _driver.emplace(getServiceContext(),
- std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()));
+ _aws.emplace(getServiceContext());
+ _driver.emplace(getServiceContext(), *_aws);
}
void tearDown() override {
@@ -114,6 +114,7 @@ protected:
TransactionCoordinatorTestBase::tearDown();
}
+ boost::optional<txn::AsyncWorkScheduler> _aws;
boost::optional<TransactionCoordinatorDriver> _driver;
};
@@ -820,5 +821,14 @@ TEST_F(TransactionCoordinatorTest,
coordinator.onCompletion().get();
}
+TEST_F(TransactionCoordinatorTest, AbandonNewlyCreatedCoordinator) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ network()->now() + Seconds{30});
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index 6c76aa844fd..21e907f7e5d 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -31,18 +31,13 @@
#include "mongo/platform/basic.h"
-#include "mongo/client/remote_command_targeter.h"
-#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/transaction_coordinator_service.h"
#include "mongo/db/transaction_participant.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/transport/service_entry_point.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -272,50 +267,32 @@ public:
<< "autocommit"
<< false);
- BSONObj abortResponseObj;
-
- const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- auto cbHandle = uassertStatusOK(executor->scheduleWork([
- serviceContext = opCtx->getServiceContext(),
- &abortResponseObj,
- abortRequestObj = abortRequestObj.getOwned()
- ](const executor::TaskExecutor::CallbackArgs& cbArgs) {
- ThreadClient threadClient(serviceContext);
- auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
-
- AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx);
-
- auto requestOpMsg =
- OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, abortRequestObj)
- .serialize();
- const auto replyOpMsg = OpMsg::parseOwned(serviceContext->getServiceEntryPoint()
- ->handleRequest(opCtx, requestOpMsg)
- .response);
-
- invariant(replyOpMsg.sequences.empty());
- abortResponseObj = replyOpMsg.body.getOwned();
- }));
- executor->wait(cbHandle, opCtx);
-
- const auto abortStatus = getStatusFromCommandResult(abortResponseObj);
-
- // Since the abortTransaction was sent without writeConcern, there should not be a
- // writeConcern error.
- invariant(getWriteConcernStatusFromCommandResult(abortResponseObj).isOK());
+ const auto abortStatus = [&] {
+ txn::AsyncWorkScheduler aws(opCtx->getServiceContext());
+ auto awsShutdownGuard = makeGuard([&aws] {
+ aws.shutdown({ErrorCodes::Interrupted, "Request interrupted due to timeout"});
+ });
+ auto future =
+ aws.scheduleRemoteCommand(txn::getLocalShardId(opCtx->getServiceContext()),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ abortRequestObj);
+ const auto& responseStatus = future.get(opCtx);
+ uassertStatusOK(responseStatus.status);
+
+ return getStatusFromCommandResult(responseStatus.data);
+ }();
LOG(3) << "coordinateCommitTransaction got response " << abortStatus << " for "
<< abortRequestObj << " used to recover decision from local participant";
// If the abortTransaction succeeded, return that the transaction aborted.
- uassert(ErrorCodes::NoSuchTransaction, "transaction aborted", !abortStatus.isOK());
-
- // If the abortTransaction returned that the transaction committed, return
- // ok, otherwise return whatever the abortTransaction errored with (which may be
- // NoSuchTransaction).
- uassert(abortStatus.code(),
- abortStatus.reason(),
- abortStatus.code() == ErrorCodes::TransactionCommitted);
+ if (abortStatus.isOK())
+ uasserted(ErrorCodes::NoSuchTransaction, "Transaction aborted");
+
+ // If the abortTransaction returned that the transaction committed, return OK, otherwise
+ // return whatever the abortTransaction errored with (which may be NoSuchTransaction).
+ if (abortStatus != ErrorCodes::TransactionCommitted)
+ uassertStatusOK(abortStatus);
}
private: