summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-22 14:20:58 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-02-12 07:08:24 -0500
commit1e7d26609cfa19c4840fe4ef4c1462b7fed48974 (patch)
tree575241d1ab5b80d0357b9cbb1745289cfa25e81c
parent7554cd5796762cfb037ad652818fff7b7e67a8a7 (diff)
downloadmongo-1e7d26609cfa19c4840fe4ef4c1462b7fed48974.tar.gz
SERVER-38521 Make the transaction coordinator step-up task interruptible
-rw-r--r--src/mongo/db/s/SConscript4
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.cpp10
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.cpp63
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.h38
-rw-r--r--src/mongo/db/transaction_coordinator_catalog_test.cpp36
-rw-r--r--src/mongo/db/transaction_coordinator_driver.cpp3
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp148
-rw-r--r--src/mongo/db/transaction_coordinator_service.h44
-rw-r--r--src/mongo/db/transaction_coordinator_service_test.cpp105
9 files changed, 320 insertions, 131 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 128f6985853..d35c69de259 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -75,8 +75,9 @@ env.Library(
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/repl/oplog',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
- '$BUILD_DIR/mongo/db/storage/remove_saver',
'$BUILD_DIR/mongo/db/rw_concern_d',
+ '$BUILD_DIR/mongo/db/storage/remove_saver',
+ '$BUILD_DIR/mongo/db/transaction_coordinator',
'$BUILD_DIR/mongo/s/client/shard_local',
'$BUILD_DIR/mongo/s/sharding_initialization',
'chunk_splitter',
@@ -304,7 +305,6 @@ env.Library(
'$BUILD_DIR/mongo/db/index_d',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/repl/replica_set_messages',
- '$BUILD_DIR/mongo/db/transaction_coordinator',
'$BUILD_DIR/mongo/s/commands/shared_cluster_commands',
'$BUILD_DIR/mongo/s/sharding_router_api',
'$BUILD_DIR/mongo/s/sharding_initialization',
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp
index 70aac5722f1..f2be7485319 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/s/shard_server_catalog_cache_loader.h"
#include "mongo/db/s/sharding_config_optime_gossip.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/transaction_coordinator_service.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/s/catalog_cache.h"
@@ -122,17 +123,20 @@ void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx,
// Determine primary/secondary/standalone state in order to properly initialize sharding
// components.
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
bool isStandaloneOrPrimary =
- !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
- repl::MemberState::RS_PRIMARY);
+ !isReplSet || (replCoord->getMemberState() == repl::MemberState::RS_PRIMARY);
CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary);
ChunkSplitter::get(opCtx).onShardingInitialization(isStandaloneOrPrimary);
PeriodicBalancerConfigRefresher::get(opCtx).onShardingInitialization(opCtx->getServiceContext(),
isStandaloneOrPrimary);
+ // Start the transaction coordinator service only if the node is the primary of a replica set
+ TransactionCoordinatorService::get(opCtx)->onShardingInitialization(
+ opCtx, isReplSet && isStandaloneOrPrimary);
+
Grid::get(opCtx)->setShardingInitialized();
LOG(0) << "Finished initializing sharding components for "
diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp
index 460d450d4b1..8ebc71726a1 100644
--- a/src/mongo/db/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog.cpp
@@ -45,7 +45,9 @@ MONGO_FAIL_POINT_DEFINE(doNotForgetCoordinator);
TransactionCoordinatorCatalog::TransactionCoordinatorCatalog() = default;
-TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() = default;
+TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() {
+ join();
+}
void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
LogicalSessionId lsid,
@@ -68,15 +70,8 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
"the same session ID and transaction number as a previous coordinator");
// Schedule callback to remove coordinator from catalog when it either commits or aborts.
- coordinator->onCompletion().getAsync([
- catalogWeakPtr = std::weak_ptr<TransactionCoordinatorCatalog>(shared_from_this()),
- lsid,
- txnNumber
- ](Status) {
- if (auto catalog = catalogWeakPtr.lock()) {
- catalog->remove(lsid, txnNumber);
- }
- });
+ coordinator->onCompletion().getAsync(
+ [this, lsid, txnNumber](Status) { remove(lsid, txnNumber); });
LOG(3) << "Inserting coordinator for transaction " << txnNumber << " on session "
<< lsid.toBSON() << " into in-memory catalog";
@@ -170,7 +165,7 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN
}
coordinatorsForSession.erase(coordinatorForTxnIter);
- if (coordinatorsForSession.size() == 0) {
+ if (coordinatorsForSession.empty()) {
_coordinatorsBySession.erase(coordinatorsForSessionIter);
}
}
@@ -182,41 +177,43 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN
}
}
-void TransactionCoordinatorCatalog::enterStepUp(OperationContext* opCtx) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+void TransactionCoordinatorCatalog::exitStepUp(Status status) {
+ if (status.isOK()) {
+ LOG(0) << "Incoming coordinateCommit requests are now enabled";
+ } else {
+ warning() << "Coordinator recovery failed and coordinateCommit requests will not be allowed"
+ << causedBy(status);
+ }
- // If this node stepped down and stepped back up, the asynchronous stepUp task from the previous
- // stepUp may still be running, so wait for the previous stepUp task to complete.
- LOG(3) << "Waiting for coordinator stepup task from previous term, if any, to complete";
- _waitForStepUpToComplete(lk, opCtx);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_stepUpCompletionStatus);
+ _stepUpCompletionStatus = std::move(status);
+ _stepUpCompleteCv.notify_all();
+}
- _stepUpInProgress = true;
+void TransactionCoordinatorCatalog::join() {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
- LOG(3) << "Waiting for there to be no active coordinators; current coordinator catalog: "
- << this->_toString(lk);
- opCtx->waitForConditionOrInterrupt(
- _noActiveCoordinatorsCv, lk, [this]() { return _coordinatorsBySession.empty(); });
+ while (!_noActiveCoordinatorsCv.wait_for(
+ ul, stdx::chrono::seconds{5}, [this] { return _coordinatorsBySession.empty(); })) {
+ LOG(0) << "After 5 seconds of wait there are still " << _coordinatorsBySession.size()
+ << " sessions left with active coordinators which have not yet completed";
+ LOG(0) << _toString(ul);
+ }
}
-void TransactionCoordinatorCatalog::exitStepUp() {
+std::string TransactionCoordinatorCatalog::toString() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_stepUpInProgress);
-
- LOG(3) << "Signaling stepup complete";
- _stepUpInProgress = false;
- _noStepUpInProgressCv.notify_all();
+ return _toString(lk);
}
void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<stdx::mutex>& lk,
OperationContext* opCtx) {
invariant(lk.owns_lock());
opCtx->waitForConditionOrInterrupt(
- _noStepUpInProgressCv, lk, [this]() { return !_stepUpInProgress; });
-}
+ _stepUpCompleteCv, lk, [this]() { return bool(_stepUpCompletionStatus); });
-std::string TransactionCoordinatorCatalog::toString() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _toString(lk);
+ uassertStatusOK(*_stepUpCompletionStatus);
}
std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const {
diff --git a/src/mongo/db/transaction_coordinator_catalog.h b/src/mongo/db/transaction_coordinator_catalog.h
index 98acd96d3e9..5be509fde1e 100644
--- a/src/mongo/db/transaction_coordinator_catalog.h
+++ b/src/mongo/db/transaction_coordinator_catalog.h
@@ -48,8 +48,7 @@ namespace mongo {
* itself from the config.txnCommitDecisions collection, which will be done on transition to
* primary (whether from startup or ordinary step up).
*/
-class TransactionCoordinatorCatalog
- : public std::enable_shared_from_this<TransactionCoordinatorCatalog> {
+class TransactionCoordinatorCatalog {
MONGO_DISALLOW_COPYING(TransactionCoordinatorCatalog);
public:
@@ -94,16 +93,15 @@ public:
void remove(LogicalSessionId lsid, TxnNumber txnNumber);
/**
- * Waits for the catalog to no longer be marked as in stepup and then marks the catalog as in
- * stepup and waits for all active coordinators from the previous term to complete (either
- * successfully or with an error) and be removed from the catalog.
+ * Marks no stepup in progress and signals that no stepup is in progress.
*/
- void enterStepUp(OperationContext* opCtx);
+ void exitStepUp(Status status);
/**
- * Marks no stepup in progress and signals that no stepup is in progress.
+ * Blocking method, which waits for all coordinators registered on the catalog to complete
+ * (after this returns, it is guaranteed that all onCompletion futures have been set)
*/
- void exitStepUp();
+ void join();
/**
* Returns a string representation of the map from LogicalSessionId to the list of TxnNumbers
@@ -140,16 +138,20 @@ private:
// commit coordination and would normally be expunged from memory.
LogicalSessionIdMap<TransactionCoordinatorMap> _coordinatorsBySessionDefunct;
- /**
- * Whether a thread is actively executing a stepUp task.
- */
- bool _stepUpInProgress{false};
-
- /**
- * Notified when the *current* in-progress stepUp task has completed, i.e., _stepUpInProgress
- * becomes false.
- */
- stdx::condition_variable _noStepUpInProgressCv;
+ // Stores the result of the coordinator catalog's recovery attempt (the status passed to
+ // exitStepUp). This is what the values mean:
+ //
+ // stepUpCompletionStatus = none - brand new created object (exitStepUp has not been called
+ // yet). All calls will block.
+ // stepUpCompletionStatus = OK - recovery completed successfully, transactions can be
+ // coordinated
+ // stepUpCompletionStatus = error - recovery completed with an error, transactions cannot be
+ // coordinated (all methods will fail with this error)
+ boost::optional<Status> _stepUpCompletionStatus;
+
+ // Signaled when recovery of the catalog completes (when _stepUpCompletionStatus transitions
+ // from none to either OK or error)
+ stdx::condition_variable _stepUpCompleteCv;
// Notified when the last coordinator is removed from the catalog.
stdx::condition_variable _noActiveCoordinatorsCv;
diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp
index 9bc847790ad..57fcca2831d 100644
--- a/src/mongo/db/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp
@@ -42,11 +42,12 @@ class TransactionCoordinatorCatalogTest : public ShardServerTestFixture {
protected:
void setUp() override {
ShardServerTestFixture::setUp();
- _coordinatorCatalog = std::make_shared<TransactionCoordinatorCatalog>();
+
+ _coordinatorCatalog.emplace();
+ _coordinatorCatalog->exitStepUp(Status::OK());
}
void tearDown() override {
- _coordinatorCatalog.reset();
// Make sure all of the coordinators are in a committed/aborted state before they are
// destroyed. Otherwise, the coordinator's destructor will invariant because it will still
// have outstanding futures that have not been completed (the one to remove itself from the
@@ -60,31 +61,25 @@ protected:
ShardServerTestFixture::tearDown();
}
- TransactionCoordinatorCatalog& coordinatorCatalog() {
- return *_coordinatorCatalog;
- }
-
void createCoordinatorInCatalog(OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber) {
auto newCoordinator =
std::make_shared<TransactionCoordinator>(getServiceContext(), lsid, txnNumber);
- coordinatorCatalog().insert(opCtx, lsid, txnNumber, newCoordinator);
+ _coordinatorCatalog->insert(opCtx, lsid, txnNumber, newCoordinator);
_coordinatorsForTest.push_back(newCoordinator);
}
-private:
- // Note: MUST be shared_ptr due to use of std::enable_shared_from_this
- std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog;
+ boost::optional<TransactionCoordinatorCatalog> _coordinatorCatalog;
+
std::vector<std::shared_ptr<TransactionCoordinator>> _coordinatorsForTest;
};
TEST_F(TransactionCoordinatorCatalogTest, GetOnSessionThatDoesNotExistReturnsNone) {
LogicalSessionId lsid = makeLogicalSessionIdForTest();
TxnNumber txnNumber = 1;
-
- auto coordinator = coordinatorCatalog().get(operationContext(), lsid, txnNumber);
+ auto coordinator = _coordinatorCatalog->get(operationContext(), lsid, txnNumber);
ASSERT(coordinator == nullptr);
}
@@ -93,16 +88,15 @@ TEST_F(TransactionCoordinatorCatalogTest,
LogicalSessionId lsid = makeLogicalSessionIdForTest();
TxnNumber txnNumber = 1;
createCoordinatorInCatalog(operationContext(), lsid, txnNumber);
- auto coordinatorInCatalog = coordinatorCatalog().get(operationContext(), lsid, txnNumber + 1);
+ auto coordinatorInCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumber + 1);
ASSERT(coordinatorInCatalog == nullptr);
}
-
TEST_F(TransactionCoordinatorCatalogTest, CreateFollowedByGetReturnsCoordinator) {
LogicalSessionId lsid = makeLogicalSessionIdForTest();
TxnNumber txnNumber = 1;
createCoordinatorInCatalog(operationContext(), lsid, txnNumber);
- auto coordinatorInCatalog = coordinatorCatalog().get(operationContext(), lsid, txnNumber);
+ auto coordinatorInCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumber);
ASSERT(coordinatorInCatalog != nullptr);
}
@@ -113,7 +107,7 @@ TEST_F(TransactionCoordinatorCatalogTest, SecondCreateForSessionDoesNotOverwrite
createCoordinatorInCatalog(operationContext(), lsid, txnNumber1);
createCoordinatorInCatalog(operationContext(), lsid, txnNumber2);
- auto coordinator1InCatalog = coordinatorCatalog().get(operationContext(), lsid, txnNumber1);
+ auto coordinator1InCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumber1);
ASSERT(coordinator1InCatalog != nullptr);
}
@@ -130,7 +124,7 @@ DEATH_TEST_F(TransactionCoordinatorCatalogTest,
TEST_F(TransactionCoordinatorCatalogTest, GetLatestOnSessionWithNoCoordinatorsReturnsNone) {
LogicalSessionId lsid = makeLogicalSessionIdForTest();
auto latestTxnNumAndCoordinator =
- coordinatorCatalog().getLatestOnSession(operationContext(), lsid);
+ _coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
ASSERT_FALSE(latestTxnNumAndCoordinator);
}
@@ -140,7 +134,7 @@ TEST_F(TransactionCoordinatorCatalogTest,
TxnNumber txnNumber = 1;
createCoordinatorInCatalog(operationContext(), lsid, txnNumber);
auto latestTxnNumAndCoordinator =
- coordinatorCatalog().getLatestOnSession(operationContext(), lsid);
+ _coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
ASSERT_TRUE(latestTxnNumAndCoordinator);
ASSERT_EQ(latestTxnNumAndCoordinator->first, txnNumber);
@@ -150,13 +144,13 @@ TEST_F(TransactionCoordinatorCatalogTest, CoordinatorsRemoveThemselvesFromCatalo
LogicalSessionId lsid = makeLogicalSessionIdForTest();
TxnNumber txnNumber = 1;
createCoordinatorInCatalog(operationContext(), lsid, txnNumber);
- auto coordinator = coordinatorCatalog().get(operationContext(), lsid, txnNumber);
+ auto coordinator = _coordinatorCatalog->get(operationContext(), lsid, txnNumber);
coordinator->cancelIfCommitNotYetStarted();
ASSERT(coordinator->onCompletion().isReady());
auto latestTxnNumAndCoordinator =
- coordinatorCatalog().getLatestOnSession(operationContext(), lsid);
+ _coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
ASSERT_FALSE(latestTxnNumAndCoordinator);
}
@@ -168,7 +162,7 @@ TEST_F(TransactionCoordinatorCatalogTest,
createCoordinatorInCatalog(operationContext(), lsid, txnNumber1);
createCoordinatorInCatalog(operationContext(), lsid, txnNumber2);
auto latestTxnNumAndCoordinator =
- coordinatorCatalog().getLatestOnSession(operationContext(), lsid);
+ _coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
ASSERT_EQ(latestTxnNumAndCoordinator->first, txnNumber2);
}
diff --git a/src/mongo/db/transaction_coordinator_driver.cpp b/src/mongo/db/transaction_coordinator_driver.cpp
index 6e048c731b7..3af5d5b2695 100644
--- a/src/mongo/db/transaction_coordinator_driver.cpp
+++ b/src/mongo/db/transaction_coordinator_driver.cpp
@@ -580,10 +580,9 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl
OperationContext* opCtx) {
std::vector<TransactionCoordinatorDocument> allCoordinatorDocs;
- Query query;
DBDirectClient client(opCtx);
auto coordinatorDocsCursor =
- client.query(NamespaceString::kTransactionCoordinatorsNamespace, query);
+ client.query(NamespaceString::kTransactionCoordinatorsNamespace, Query{});
while (coordinatorDocsCursor->more()) {
// TODO (SERVER-38307): Try/catch around parsing the document and skip the document if it
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index 27e1fba797a..9180607bacb 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -50,10 +50,11 @@ const auto transactionCoordinatorServiceDecoration =
} // namespace
-TransactionCoordinatorService::TransactionCoordinatorService()
- : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {}
+TransactionCoordinatorService::TransactionCoordinatorService() = default;
-TransactionCoordinatorService::~TransactionCoordinatorService() = default;
+TransactionCoordinatorService::~TransactionCoordinatorService() {
+ _joinPreviousRound();
+}
TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
@@ -67,7 +68,10 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber,
Date_t commitDeadline) {
- if (auto latestTxnNumAndCoordinator = _coordinatorCatalog->getLatestOnSession(opCtx, lsid)) {
+ auto cas = _getCatalogAndScheduler(opCtx);
+ auto& catalog = cas->catalog;
+
+ if (auto latestTxnNumAndCoordinator = catalog.getLatestOnSession(opCtx, lsid)) {
auto latestCoordinator = latestTxnNumAndCoordinator->second;
if (txnNumber == latestTxnNumAndCoordinator->first) {
return;
@@ -78,7 +82,7 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
auto coordinator =
std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), lsid, txnNumber);
- _coordinatorCatalog->insert(opCtx, lsid, txnNumber, coordinator);
+ catalog.insert(opCtx, lsid, txnNumber, coordinator);
// Schedule a task in the future to cancel the commit coordination on the coordinator, so that
// the coordinator does not remain in memory forever (in case the particpant list is never
@@ -103,7 +107,10 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor
LogicalSessionId lsid,
TxnNumber txnNumber,
const std::set<ShardId>& participantList) {
- auto coordinator = _coordinatorCatalog->get(opCtx, lsid, txnNumber);
+ auto cas = _getCatalogAndScheduler(opCtx);
+ auto& catalog = cas->catalog;
+
+ auto coordinator = catalog.get(opCtx, lsid, txnNumber);
if (!coordinator) {
return boost::none;
}
@@ -117,49 +124,48 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor
// TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision
// is made durable. Currently the coordinator waits to hear acks because participants in prepare
// reject requests with a higher transaction number, causing tests to fail.
- // return coordinator.get()->runCommit(participants);
+ // return coordinator->runCommit(participants);
}
boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::recoverCommit(
OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) {
- auto coordinator = _coordinatorCatalog->get(opCtx, lsid, txnNumber);
+ auto cas = _getCatalogAndScheduler(opCtx);
+ auto& catalog = cas->catalog;
+
+ auto coordinator = catalog.get(opCtx, lsid, txnNumber);
if (!coordinator) {
return boost::none;
}
- return coordinator.get()->onCompletion().then(
- [coordinator] { return coordinator.get()->getDecision().get(); });
+ return coordinator->onCompletion().then(
+ [coordinator] { return coordinator->getDecision().get(); });
+
// TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision
// is made durable. Currently the coordinator waits to hear acks because participants in prepare
// reject requests with a higher transaction number, causing tests to fail.
- // return coordinator.get()->getDecision();
+ // return coordinator->getDecision();
}
-void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) {
- // Blocks until the stepup task from the last term completes, then marks a new stepup task as
- // having begun and blocks until all active coordinators complete (are removed from the
- // catalog).
- // Note: No other threads can read the catalog while the catalog is marked as having an active
- // stepup task.
- _coordinatorCatalog->enterStepUp(opCtx);
+void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
+ Milliseconds recoveryDelayForTesting) {
+ _joinPreviousRound();
- const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- auto status =
- executor
- ->scheduleWork([ this, service = opCtx->getServiceContext() ](
- const executor::TaskExecutor::CallbackArgs&) {
- try {
- // The opCtx destructor handles unsetting itself from the Client
- ThreadClient threadClient("TransactionCoordinator-StepUp", service);
- auto opCtxPtr = Client::getCurrent()->makeOperationContext();
- auto opCtx = opCtxPtr.get();
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ invariant(!_catalogAndScheduler);
+ _catalogAndScheduler = std::make_shared<CatalogAndScheduler>(opCtx->getServiceContext());
+ auto future =
+ _catalogAndScheduler->scheduler
+ .scheduleWorkIn(
+ recoveryDelayForTesting,
+ [catalogAndScheduler = _catalogAndScheduler](OperationContext * opCtx) {
auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient());
replClientInfo.setLastOpToSystemLastOpTime(opCtx);
const auto lastOpTime = replClientInfo.getLastOp();
- LOG(3) << "Going to wait for client's last OpTime " << lastOpTime
+ LOG(3) << "Waiting for OpTime " << lastOpTime
<< " to become majority committed";
+
WriteConcernResult unusedWCResult;
uassertStatusOK(waitForWriteConcern(
opCtx,
@@ -175,38 +181,82 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) {
LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size()
<< " transactions";
+ auto& catalog = catalogAndScheduler->catalog;
+
for (const auto& doc : coordinatorDocs) {
LOG(3) << "Going to resume coordinating commit for " << doc.toBSON();
+
const auto lsid = *doc.getId().getSessionId();
const auto txnNumber = *doc.getId().getTxnNumber();
auto coordinator = std::make_shared<TransactionCoordinator>(
opCtx->getServiceContext(), lsid, txnNumber);
- _coordinatorCatalog->insert(
- opCtx, lsid, txnNumber, coordinator, true /* forStepUp */);
+ catalog.insert(opCtx, lsid, txnNumber, coordinator, true /* forStepUp */);
coordinator->continueCommit(doc);
}
+ })
+ .tapAll([catalogAndScheduler = _catalogAndScheduler](Status status) {
+ // TODO (SERVER-38320): Reschedule the step-up task if the interruption was not due
+ // to stepdown.
- _coordinatorCatalog->exitStepUp();
-
- LOG(3) << "Incoming coordinateCommit requests now accepted";
- } catch (const DBException& e) {
- LOG(3) << "Failed while executing thread to resume coordinating commit for "
- "pending "
- "transactions "
- << causedBy(e.toStatus());
- _coordinatorCatalog->exitStepUp();
- }
- })
- .getStatus();
-
- // TODO (SERVER-38320): Reschedule the stepup task if the interruption was not due to stepdown.
- if (status == ErrorCodes::ShutdownInProgress || ErrorCodes::isInterruption(status.code())) {
- return;
+ auto& catalog = catalogAndScheduler->catalog;
+ catalog.exitStepUp(status);
+ });
+
+ _catalogAndScheduler->recoveryTaskCompleted.emplace(std::move(future));
+}
+
+void TransactionCoordinatorService::onStepDown() {
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ if (!_catalogAndScheduler)
+ return;
+
+ _catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler);
}
- invariant(status);
+
+ _catalogAndSchedulerToCleanup->scheduler.shutdown(
+ {ErrorCodes::InterruptedDueToStepDown, "Transaction coordinator service stepping down"});
+}
+
+void TransactionCoordinatorService::onShardingInitialization(OperationContext* opCtx,
+ bool isPrimary) {
+ if (!isPrimary)
+ return;
+
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
+ invariant(!_catalogAndScheduler);
+ _catalogAndScheduler = std::make_shared<CatalogAndScheduler>(opCtx->getServiceContext());
+
+ _catalogAndScheduler->catalog.exitStepUp(Status::OK());
+ _catalogAndScheduler->recoveryTaskCompleted.emplace(Future<void>::makeReady());
}
-void TransactionCoordinatorService::onStepDown() {}
+std::shared_ptr<TransactionCoordinatorService::CatalogAndScheduler>
+TransactionCoordinatorService::_getCatalogAndScheduler(OperationContext* opCtx) {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ uassert(
+ ErrorCodes::NotMaster, "Transaction coordinator is not a primary", _catalogAndScheduler);
+
+ return _catalogAndScheduler;
+}
+
+void TransactionCoordinatorService::_joinPreviousRound() {
+ // onStepDown must have been called
+ invariant(!_catalogAndScheduler);
+
+ if (!_catalogAndSchedulerToCleanup)
+ return;
+
+ LOG(0) << "Waiting for coordinator tasks from previous term to complete";
+
+ // Block until all coordinators scheduled the previous time the service was primary to have
+ // drained. Because the scheduler was interrupted, it should be extremely rare for there to be
+ // any coordinators left, so if this actually causes blocking, it would most likely be a bug.
+ _catalogAndSchedulerToCleanup->recoveryTaskCompleted->wait();
+ _catalogAndSchedulerToCleanup->catalog.join();
+ _catalogAndSchedulerToCleanup.reset();
+}
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h
index 115c7604bcc..a569dfdd617 100644
--- a/src/mongo/db/transaction_coordinator_service.h
+++ b/src/mongo/db/transaction_coordinator_service.h
@@ -90,12 +90,52 @@ public:
* 2. Read all pending commit tasks from the config.transactionCoordinators collection.
* 3. Create TransactionCoordinator objects in memory for each pending commit and launch an
* async task to continue coordinating its commit.
+ *
+ * The 'recoveryDelay' argument is only used for testing in order to simulate recovery taking
+ * long time.
*/
- void onStepUp(OperationContext* opCtx);
+ void onStepUp(OperationContext* opCtx, Milliseconds recoveryDelayForTesting = Milliseconds(0));
void onStepDown();
+ /**
+ * Called when an already established replica set is added as a shard to a cluster. Ensures that
+ * the TransactionCoordinator service is started up if the replica set is currently primary.
+ */
+ void onShardingInitialization(OperationContext* opCtx, bool isPrimary);
+
private:
- std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog;
+ struct CatalogAndScheduler {
+ CatalogAndScheduler(ServiceContext* service) : scheduler(service) {}
+
+ txn::AsyncWorkScheduler scheduler;
+ TransactionCoordinatorCatalog catalog;
+
+ boost::optional<Future<void>> recoveryTaskCompleted;
+ };
+
+ /**
+ * Returns the current catalog + scheduler if stepUp has started, otherwise throws a NotMaster
+ * exception.
+ */
+ std::shared_ptr<CatalogAndScheduler> _getCatalogAndScheduler(OperationContext* opCtx);
+
+ /**
+ * Blocking call which waits for the previous stepUp/stepDown round to join and ensures all
+ * tasks scheduled by that round have completed.
+ */
+ void _joinPreviousRound();
+
+ // Contains the catalog + scheduler, which was active at the last step-down attempt (if any).
+ // Set at onStepDown and destroyed at onStepUp, which are always invoked sequentially by the
+ // replication machinery, so there is no need to explicitly synchronize it
+ std::shared_ptr<CatalogAndScheduler> _catalogAndSchedulerToCleanup;
+
+ // Protects the state below
+ mutable stdx::mutex _mutex;
+
+ // The catalog + scheduler instantiated at the last step-up attempt. When nullptr, it means
+ // onStepUp has not been called yet after the last stepDown (or construction).
+ std::shared_ptr<CatalogAndScheduler> _catalogAndScheduler;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp
index 7b8020f8622..115fb760b60 100644
--- a/src/mongo/db/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/transaction_coordinator_service_test.cpp
@@ -34,10 +34,12 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/transaction_coordinator_service.h"
#include "mongo/db/transaction_coordinator_test_fixture.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
@@ -62,7 +64,7 @@ const StatusWith<BSONObj> kPrepareOkButWriteConcernError =
BSON("ok" << 1 << "prepareTimestamp" << Timestamp(1, 1) << "writeConcernError"
<< kDummyWriteConcernError);
-class TransactionCoordinatorServiceTest : public TransactionCoordinatorTestFixture {
+class TransactionCoordinatorServiceTestFixture : public TransactionCoordinatorTestFixture {
public:
// Prepare responses
@@ -193,6 +195,107 @@ public:
commitDecisionFuture.get();
}
+ auto* service() const {
+ return TransactionCoordinatorService::get(operationContext());
+ }
+};
+
+
+using TransactionCoordinatorServiceStepUpStepDownTest = TransactionCoordinatorServiceTestFixture;
+
+TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, OperationsFailBeforeStepUpStarts) {
+ ASSERT_THROWS_CODE(service()->createCoordinator(
+ operationContext(), makeLogicalSessionIdForTest(), 0, kCommitDeadline),
+ AssertionException,
+ ErrorCodes::NotMaster);
+
+ ASSERT_THROWS_CODE(service()->coordinateCommit(
+ operationContext(), makeLogicalSessionIdForTest(), 0, kTwoShardIdSet),
+ AssertionException,
+ ErrorCodes::NotMaster);
+
+ ASSERT_THROWS_CODE(
+ service()->recoverCommit(operationContext(), makeLogicalSessionIdForTest(), 0),
+ AssertionException,
+ ErrorCodes::NotMaster);
+}
+
+TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, OperationsBlockBeforeStepUpCompletes) {
+ service()->onStepUp(operationContext(), Milliseconds(1));
+ auto stepDownGuard = makeGuard([&] { service()->onStepDown(); });
+
+ ASSERT_THROWS_CODE(operationContext()->runWithDeadline(
+ Date_t::now() + Milliseconds{5},
+ ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ [&] {
+ return service()->coordinateCommit(operationContext(),
+ makeLogicalSessionIdForTest(),
+ 0,
+ kTwoShardIdSet);
+ }),
+ AssertionException,
+ ErrorCodes::NetworkInterfaceExceededTimeLimit);
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->advanceTime(network()->now() + Milliseconds(1));
+ }
+
+ ASSERT(service()->coordinateCommit(
+ operationContext(), makeLogicalSessionIdForTest(), 0, kTwoShardIdSet) ==
+ boost::none);
+}
+
+TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, StepUpFailsDueToBadCoordinatorDocument) {
+ DBDirectClient client(operationContext());
+ client.insert(NamespaceString::kTransactionCoordinatorsNamespace.ns(), BSON("IllegalKey" << 1));
+ ASSERT_EQ("", client.getLastError());
+
+ service()->onStepUp(operationContext());
+ auto stepDownGuard = makeGuard([&] { service()->onStepDown(); });
+
+ ASSERT_THROWS_CODE(service()->coordinateCommit(
+ operationContext(), makeLogicalSessionIdForTest(), 0, kTwoShardIdSet),
+ AssertionException,
+ ErrorCodes::TypeMismatch);
+
+ ASSERT_THROWS_CODE(
+ service()->recoverCommit(operationContext(), makeLogicalSessionIdForTest(), 0),
+ AssertionException,
+ ErrorCodes::TypeMismatch);
+}
+
+TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, StepDownBeforeStepUpTaskCompleted) {
+ // Call step-up with 1ms delay (meaning it will not actually execute until time is manually
+ // advanced on the underlying executor)
+ service()->onStepUp(operationContext(), Milliseconds(1));
+
+ // Should cancel all outstanding tasks (including the recovery task started by onStepUp above,
+ // which has not yet run)
+ service()->onStepDown();
+
+ // Do another onStepUp to ensure it runs successfully
+ service()->onStepUp(operationContext());
+
+ // Step-down the service so that the destructor does not complain
+ service()->onStepDown();
+}
+
+
+class TransactionCoordinatorServiceTest : public TransactionCoordinatorServiceTestFixture {
+protected:
+ void setUp() override {
+ TransactionCoordinatorServiceTestFixture::setUp();
+
+ service()->onStepUp(operationContext());
+ }
+
+ void tearDown() override {
+ service()->onStepDown();
+
+ TransactionCoordinatorServiceTestFixture::tearDown();
+ }
+
LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
TxnNumber _txnNumber{1};
};