summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-03-18 18:25:31 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-03-21 09:20:17 -0400
commitc3d1212f1a4a258833b792a43d2d2d8a144f1adb (patch)
tree02afa07ae3dc723a56abdc172ab9973217966a0d /src
parentcc2b4b907aaf788f356ec23e1b315ea5d7b2cf82 (diff)
downloadmongo-c3d1212f1a4a258833b792a43d2d2d8a144f1adb.tar.gz
SERVER-40223 Ensure that the TransactionCoordinator will always complete if its scheduler is shut down
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err2
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp13
-rw-r--r--src/mongo/db/s/transaction_coordinator.h15
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog.cpp28
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog.h22
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog_test.cpp49
6 files changed, 80 insertions, 49 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 6601adf40e6..4fa6ca8929b 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -285,6 +285,8 @@ error_code("WouldChangeOwningShard", 283, extra="WouldChangeOwningShardInfo")
error_code("ForTestingErrorExtraInfoWithExtraInfoInNamespace", 284, extra="nested::twice::NestedErrorExtraInfoExample")
error_code("IndexBuildAlreadyInProgress", 285)
error_code("ChangeStreamHistoryLost", 286)
+# The code below is for internal use only and must never be returned in a network response
+error_code("TransactionCoordinatorDeadlineTaskCanceled", 287)
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 0a9eb4f7bb6..6e7d4d377ed 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -83,9 +83,12 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
if (coordinateCommitDeadline) {
_deadlineScheduler = _scheduler->makeChildScheduler();
_deadlineScheduler
- ->scheduleWorkAt(*coordinateCommitDeadline,
- [this](OperationContext* opCtx) { cancelIfCommitNotYetStarted(); })
- .getAsync([](const Status&) {});
+ ->scheduleWorkAt(*coordinateCommitDeadline, [](OperationContext* opCtx) {})
+ .getAsync([this](const Status& s) {
+ if (s == ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled)
+ return;
+ cancelIfCommitNotYetStarted();
+ });
}
}
@@ -183,8 +186,8 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() {
void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() {
if (_deadlineScheduler) {
- _deadlineScheduler->shutdown(
- {ErrorCodes::CallbackCanceled, "Interrupting the commit received deadline task"});
+ _deadlineScheduler->shutdown({ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled,
+ "Interrupting the commit received deadline task"});
}
}
diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h
index 6397cf21c65..d47a39b9a88 100644
--- a/src/mongo/db/s/transaction_coordinator.h
+++ b/src/mongo/db/s/transaction_coordinator.h
@@ -37,7 +37,12 @@
namespace mongo {
/**
- * Class responsible for coordinating two-phase commit across shards.
+ * State machine, which implements the two-phase commit protocol for a specific transaction,
+ * identified by lsid + txnNumber.
+ *
+ * The lifetime of a coordinator starts with a construction and ends with the `onCompletion()`
+ * future getting signaled. It is illegal to destroy a coordinator without waiting for
+ * `onCompletion()`.
*/
class TransactionCoordinator {
MONGO_DISALLOW_COPYING(TransactionCoordinator);
@@ -125,11 +130,9 @@ public:
SharedSemiFuture<txn::CommitDecision> getDecision();
/**
- * Returns a future that will be signaled when the transaction has completely finished
- * committing or aborting (i.e. when commit/abort acknowledgements have been received from all
- * participants, or the coordinator commit process is aborted locally for some reason).
- *
- * Unlike runCommit, this will not kick off the commit process if it has not already begun.
+ * Returns a future which can be listened on for when all the asynchronous activity spawned by
+ * this coordinator has completed. It will always eventually be set and once set it is safe to
+ * dispose of the TransactionCoordinator object.
*/
Future<void> onCompletion();
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp
index 1d66b095602..8d5dc379d0e 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp
@@ -78,8 +78,7 @@ void TransactionCoordinatorCatalog::onStepDown() {
coordinator->cancelIfCommitNotYetStarted();
}
- ul.lock();
- _cleanupCompletedCoordinators(ul);
+ _cleanupCompletedCoordinators();
}
void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
@@ -90,8 +89,9 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
LOG(3) << "Inserting coordinator " << lsid.getId() << ':' << txnNumber
<< " into in-memory catalog";
+ auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(); });
+
stdx::unique_lock<stdx::mutex> ul(_mutex);
- auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); });
if (!forStepUp) {
_waitForStepUpToComplete(ul, opCtx);
}
@@ -105,17 +105,23 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
"Cannot insert a TransactionCoordinator into the TransactionCoordinatorCatalog with "
"the same session ID and transaction number as a previous coordinator");
- // Schedule callback to remove coordinator from catalog when it either commits or aborts.
+ coordinatorsBySession[txnNumber] = coordinator;
+
+ // Schedule callback to remove the coordinator from the catalog when all its activities have
+ // completed. This needs to be done outside of the mutex, in case the coordinator completed
+ // early because of stepdown. Otherwise the continuation could execute on the same thread and
+ // recursively acquire the mutex.
+ ul.unlock();
+
coordinator->onCompletion().getAsync(
[this, lsid, txnNumber](Status) { _remove(lsid, txnNumber); });
-
- coordinatorsBySession[txnNumber] = std::move(coordinator);
}
std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(
OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber) {
+ _cleanupCompletedCoordinators();
+
stdx::unique_lock<stdx::mutex> ul(_mutex);
- auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); });
_waitForStepUpToComplete(ul, opCtx);
std::shared_ptr<TransactionCoordinator> coordinatorToReturn;
@@ -149,8 +155,9 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(
boost::optional<std::pair<TxnNumber, std::shared_ptr<TransactionCoordinator>>>
TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx,
const LogicalSessionId& lsid) {
+ _cleanupCompletedCoordinators();
+
stdx::unique_lock<stdx::mutex> ul(_mutex);
- auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); });
_waitForStepUpToComplete(ul, opCtx);
const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid);
@@ -242,9 +249,8 @@ void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<s
uassertStatusOK(*_stepUpCompletionStatus);
}
-void TransactionCoordinatorCatalog::_cleanupCompletedCoordinators(
- stdx::unique_lock<stdx::mutex>& ul) {
- invariant(ul.owns_lock());
+void TransactionCoordinatorCatalog::_cleanupCompletedCoordinators() {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
auto coordinatorsToCleanup = std::move(_coordinatorsToCleanup);
// Ensure the destructors run outside of the lock in order to minimize the time this methods
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.h b/src/mongo/db/s/transaction_coordinator_catalog.h
index 67f6daf77db..1605dbe8869 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog.h
+++ b/src/mongo/db/s/transaction_coordinator_catalog.h
@@ -40,8 +40,8 @@
namespace mongo {
/**
- * A container for TransactionCoordinator objects, indexed by logical session id and transaction
- * number. It allows holding several coordinator objects per session.
+ * This class is a registry for all the active TransactionCoordinator objects, indexed by lsid and
+ * txnNumber. It supports holding several coordinator objects per session.
*/
class TransactionCoordinatorCatalog {
MONGO_DISALLOW_COPYING(TransactionCoordinatorCatalog);
@@ -61,11 +61,15 @@ public:
void onStepDown();
/**
- * Inserts a coordinator into the catalog.
+ * Inserts a coordinator to be tracked by the catalog.
*
- * Note: Inserting a duplicate coordinator for the given session id and transaction number
- * is not allowed and will lead to an invariant failure. Users of the catalog must ensure this
- * does not take place.
+ * Duplicate lsid + txnNumber are not legal and will lead to invariant. The consumer of this
+ * class (TransactionCoordinatorService) guarantees this will not happen through the following
+ * means:
+ * - At step-up recovery time - the catalog starts empty and the coordinators inserted are read
+ * from the `config.coordinators` collection, which only contains unique entries
+ * - At regular run time - the session check-out mechanism guarantees that calls to get,
+ * followed by insert are atomic for the same lsid + txnNumber
*/
void insert(OperationContext* opCtx,
const LogicalSessionId& lsid,
@@ -122,11 +126,9 @@ private:
void _remove(const LogicalSessionId& lsid, TxnNumber txnNumber);
/**
- * Goes through the '_coordinatorsToCleanup' list and deletes entries from it. As a side-effect
- * also unlocks the passed-in lock, so no other synchronized members of the class should be
- * accessed after it is called.
+ * Goes through the '_coordinatorsToCleanup' list and deletes entries from it
*/
- void _cleanupCompletedCoordinators(stdx::unique_lock<stdx::mutex>& ul);
+ void _cleanupCompletedCoordinators();
/**
* Constructs a string representation of all the coordinators registered on the catalog.
diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
index b73955b5c78..db11b3aff10 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
@@ -30,34 +30,27 @@
#include "mongo/platform/basic.h"
#include "mongo/db/s/transaction_coordinator_catalog.h"
-#include "mongo/s/shard_server_test_fixture.h"
+#include "mongo/db/s/transaction_coordinator_test_fixture.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace {
-class TransactionCoordinatorCatalogTest : public ShardServerTestFixture {
+class TransactionCoordinatorCatalogTest : public TransactionCoordinatorTestFixture {
protected:
void setUp() override {
- ShardServerTestFixture::setUp();
+ TransactionCoordinatorTestFixture::setUp();
_coordinatorCatalog.emplace();
_coordinatorCatalog->exitStepUp(Status::OK());
}
void tearDown() override {
- // Make sure all of the coordinators are in a committed/aborted state before they are
- // destroyed. Otherwise, the coordinator's destructor will invariant because it will still
- // have outstanding futures that have not been completed (the one to remove itself from the
- // catalog). This has the added benefit of testing whether it's okay to destroy
- // the catalog while there are outstanding coordinators.
- for (auto& coordinator : _coordinatorsForTest) {
- coordinator->cancelIfCommitNotYetStarted();
- }
- _coordinatorsForTest.clear();
-
- ShardServerTestFixture::tearDown();
+ _coordinatorCatalog->onStepDown();
+ _coordinatorCatalog.reset();
+
+ TransactionCoordinatorTestFixture::tearDown();
}
void createCoordinatorInCatalog(OperationContext* opCtx,
@@ -71,12 +64,9 @@ protected:
boost::none);
_coordinatorCatalog->insert(opCtx, lsid, txnNumber, newCoordinator);
- _coordinatorsForTest.push_back(newCoordinator);
}
boost::optional<TransactionCoordinatorCatalog> _coordinatorCatalog;
-
- std::vector<std::shared_ptr<TransactionCoordinator>> _coordinatorsForTest;
};
TEST_F(TransactionCoordinatorCatalogTest, GetOnSessionThatDoesNotExistReturnsNone) {
@@ -170,5 +160,30 @@ TEST_F(TransactionCoordinatorCatalogTest,
ASSERT_EQ(latestTxnNumAndCoordinator->first, txnNumber2);
}
+TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoCatalog) {
+ LogicalSessionId lsid = makeLogicalSessionIdForTest();
+ TxnNumber txnNumber = 1;
+
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ TransactionCoordinatorCatalog catalog;
+ catalog.exitStepUp(Status::OK());
+
+ auto coordinator = std::make_shared<TransactionCoordinator>(getServiceContext(),
+ lsid,
+ txnNumber,
+ aws.makeChildScheduler(),
+ network()->now() + Seconds{5});
+
+ aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Test step down"});
+ catalog.onStepDown();
+
+ advanceClockAndExecuteScheduledTasks();
+
+ catalog.insert(operationContext(), lsid, txnNumber, coordinator);
+ catalog.join();
+
+ coordinator->onCompletion().wait();
+}
+
} // namespace
} // namespace mongo