summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2021-06-14 15:28:55 +0200
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-16 16:53:57 +0000
commitfc1ea44740c05ab73ce0bcf8a1897aa322b1d057 (patch)
tree270c0691a802f5e486731d56e8226ecc63d71138
parent5c00024e3cf4a27039117e000e475c6ee797c700 (diff)
downloadmongo-fc1ea44740c05ab73ce0bcf8a1897aa322b1d057.tar.gz
SERVER-57649 DDL coordinator dist lock acquisition must be resilient to CSRS stepdowns
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.cpp12
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.h3
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.cpp68
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.h8
4 files changed, 53 insertions, 38 deletions
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp
index 08168994c1e..d8db3483980 100644
--- a/src/mongo/db/s/rename_collection_coordinator.cpp
+++ b/src/mongo/db/s/rename_collection_coordinator.cpp
@@ -85,17 +85,9 @@ void RenameCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) con
SimpleBSONObjComparator::kInstance.evaluate(selfReq == otherReq));
}
-std::vector<DistLockManager::ScopedDistLock> RenameCollectionCoordinator::_acquireAdditionalLocks(
+std::vector<StringData> RenameCollectionCoordinator::_acquireAdditionalLocks(
OperationContext* opCtx) {
- const auto coorName = DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType());
-
- auto distLockManager = DistLockManager::get(opCtx);
- auto targetCollDistLock = uassertStatusOK(distLockManager->lock(
- opCtx, _doc.getTo().ns(), coorName, DistLockManager::kDefaultLockTimeout));
-
- std::vector<DistLockManager::ScopedDistLock> vec;
- vec.push_back(targetCollDistLock.moveToAnotherThread());
- return vec;
+ return {_doc.getTo().ns()};
}
boost::optional<BSONObj> RenameCollectionCoordinator::reportForCurrentOp(
diff --git a/src/mongo/db/s/rename_collection_coordinator.h b/src/mongo/db/s/rename_collection_coordinator.h
index 1621c323604..c3779b6c289 100644
--- a/src/mongo/db/s/rename_collection_coordinator.h
+++ b/src/mongo/db/s/rename_collection_coordinator.h
@@ -62,8 +62,7 @@ private:
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
- std::vector<DistLockManager::ScopedDistLock> _acquireAdditionalLocks(
- OperationContext* opCtx) override;
+ std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) override;
template <typename Func>
auto _executePhase(const Phase& newPhase, Func&& func) {
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp
index df75c4526ca..a1040e17344 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp
@@ -63,6 +63,7 @@ ShardingDDLCoordinator::ShardingDDLCoordinator(ShardingDDLCoordinatorService* se
const BSONObj& coorDoc)
: _service(service),
_coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)),
+ _coorName(DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType())),
_recoveredFromDisk(_coorMetadata.getRecoveredFromDisk()) {}
ShardingDDLCoordinator::~ShardingDDLCoordinator() {
@@ -105,6 +106,25 @@ bool ShardingDDLCoordinator::_removeDocument(OperationContext* opCtx) {
return batchedResponse.getN() > 0;
}
+
+ExecutorFuture<void> ShardingDDLCoordinator::_acquireLockAsync(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token,
+ StringData resource) {
+ return AsyncTry([this, resource = resource.toString()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ auto distLockManager = DistLockManager::get(opCtx);
+
+ auto dbDistLock = uassertStatusOK(distLockManager->lock(
+ opCtx, resource, _coorName, DistLockManager::kDefaultLockTimeout));
+ _scopedLocks.emplace(dbDistLock.moveToAnotherThread());
+ })
+ .until([this](Status status) { return (!_recoveredFromDisk) || status.isOK(); })
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(**executor, token);
+}
+
void ShardingDDLCoordinator::interrupt(Status status) {
LOGV2_DEBUG(5390535,
1,
@@ -127,17 +147,12 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas
return ExecutorFuture<void>(**executor)
.then([this, executor, token, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
- auto* opCtx = opCtxHolder.get();
- const auto coorName =
- DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType());
-
- auto distLockManager = DistLockManager::get(opCtx);
- auto dbDistLock = uassertStatusOK(distLockManager->lock(
- opCtx, nss().db(), coorName, DistLockManager::kDefaultLockTimeout));
- _scopedLocks.emplace(dbDistLock.moveToAnotherThread());
-
- if (!nss().isConfigDB() && !_coorMetadata.getRecoveredFromDisk()) {
+ return _acquireLockAsync(executor, token, nss().db());
+ })
+ .then([this, executor, token, anchor = shared_from_this()] {
+ if (!nss().isConfigDB() && !_recoveredFromDisk) {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
invariant(_coorMetadata.getDatabaseVersion());
OperationShardingState::get(opCtx).initializeClientRoutingVersions(
@@ -145,22 +160,27 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas
// Check under the dbLock if this is still the primary shard for the database
DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss().db());
};
-
+ })
+ .then([this, executor, token, anchor = shared_from_this()] {
if (!nss().coll().empty()) {
- auto collDistLock = uassertStatusOK(distLockManager->lock(
- opCtx, nss().ns(), coorName, DistLockManager::kDefaultLockTimeout));
- _scopedLocks.emplace(collDistLock.moveToAnotherThread());
+ return _acquireLockAsync(executor, token, nss().ns());
}
-
- for (auto& lock : _acquireAdditionalLocks(opCtx)) {
- _scopedLocks.emplace(lock.moveToAnotherThread());
+ return ExecutorFuture<void>(**executor);
+ })
+ .then([this, executor, token, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ auto additionalLocks = _acquireAdditionalLocks(opCtx);
+ if (!additionalLocks.empty()) {
+ invariant(additionalLocks.size() == 1);
+ return _acquireLockAsync(executor, token, additionalLocks.front());
}
-
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (!_constructionCompletionPromise.getFuture().isReady()) {
- _constructionCompletionPromise.emplaceValue();
- }
+ return ExecutorFuture<void>(**executor);
+ })
+ .then([this, anchor = shared_from_this()] {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_constructionCompletionPromise.getFuture().isReady()) {
+ _constructionCompletionPromise.emplaceValue();
}
hangBeforeRunningCoordinatorInstance.pauseWhileSet();
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h
index c5f9d0f6ec5..07397e7c4dc 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator.h
@@ -90,13 +90,14 @@ public:
}
protected:
- virtual std::vector<DistLockManager::ScopedDistLock> _acquireAdditionalLocks(
- OperationContext* opCtx) {
+ virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) {
return {};
};
ShardingDDLCoordinatorService* _service;
ShardingDDLCoordinatorMetadata _coorMetadata;
+ StringData _coorName;
+
bool _recoveredFromDisk;
bool _completeOnError{false};
@@ -110,6 +111,9 @@ private:
void interrupt(Status status) override final;
bool _removeDocument(OperationContext* opCtx);
+ ExecutorFuture<void> _acquireLockAsync(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token,
+ StringData resource);
Mutex _mutex = MONGO_MAKE_LATCH("ShardingDDLCoordinator::_mutex");
SharedPromise<void> _constructionCompletionPromise;