diff options
author | Tommaso Tocci <tommaso.tocci@mongodb.com> | 2021-06-14 15:28:55 +0200 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-17 13:21:17 +0000 |
commit | 7a59105d2b769d3a86c956cc08af01d6bd59fd82 (patch) | |
tree | 7520117d9345548b4838022acef5b21bdca62538 /src | |
parent | d4b437d3977295affd00ed88141c0c2abb180397 (diff) | |
download | mongo-7a59105d2b769d3a86c956cc08af01d6bd59fd82.tar.gz |
SERVER-57649 DDL coordinator dist lock acquisition must be resilient to CSRS stepdowns
(cherry picked from commit fc1ea44740c05ab73ce0bcf8a1897aa322b1d057)
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.h | 8 |
4 files changed, 53 insertions, 38 deletions
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp index 08168994c1e..d8db3483980 100644 --- a/src/mongo/db/s/rename_collection_coordinator.cpp +++ b/src/mongo/db/s/rename_collection_coordinator.cpp @@ -85,17 +85,9 @@ void RenameCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) con SimpleBSONObjComparator::kInstance.evaluate(selfReq == otherReq)); } -std::vector<DistLockManager::ScopedDistLock> RenameCollectionCoordinator::_acquireAdditionalLocks( +std::vector<StringData> RenameCollectionCoordinator::_acquireAdditionalLocks( OperationContext* opCtx) { - const auto coorName = DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType()); - - auto distLockManager = DistLockManager::get(opCtx); - auto targetCollDistLock = uassertStatusOK(distLockManager->lock( - opCtx, _doc.getTo().ns(), coorName, DistLockManager::kDefaultLockTimeout)); - - std::vector<DistLockManager::ScopedDistLock> vec; - vec.push_back(targetCollDistLock.moveToAnotherThread()); - return vec; + return {_doc.getTo().ns()}; } boost::optional<BSONObj> RenameCollectionCoordinator::reportForCurrentOp( diff --git a/src/mongo/db/s/rename_collection_coordinator.h b/src/mongo/db/s/rename_collection_coordinator.h index 1621c323604..c3779b6c289 100644 --- a/src/mongo/db/s/rename_collection_coordinator.h +++ b/src/mongo/db/s/rename_collection_coordinator.h @@ -62,8 +62,7 @@ private: ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; - std::vector<DistLockManager::ScopedDistLock> _acquireAdditionalLocks( - OperationContext* opCtx) override; + std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) override; template <typename Func> auto _executePhase(const Phase& newPhase, Func&& func) { diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index df75c4526ca..a1040e17344 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -63,6 +63,7 @@ ShardingDDLCoordinator::ShardingDDLCoordinator(ShardingDDLCoordinatorService* se const BSONObj& coorDoc) : _service(service), _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)), + _coorName(DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType())), _recoveredFromDisk(_coorMetadata.getRecoveredFromDisk()) {} ShardingDDLCoordinator::~ShardingDDLCoordinator() { @@ -105,6 +106,25 @@ bool ShardingDDLCoordinator::_removeDocument(OperationContext* opCtx) { return batchedResponse.getN() > 0; } + +ExecutorFuture<void> ShardingDDLCoordinator::_acquireLockAsync( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token, + StringData resource) { + return AsyncTry([this, resource = resource.toString()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + auto distLockManager = DistLockManager::get(opCtx); + + auto dbDistLock = uassertStatusOK(distLockManager->lock( + opCtx, resource, _coorName, DistLockManager::kDefaultLockTimeout)); + _scopedLocks.emplace(dbDistLock.moveToAnotherThread()); + }) + .until([this](Status status) { return (!_recoveredFromDisk) || status.isOK(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, token); +} + void ShardingDDLCoordinator::interrupt(Status status) { LOGV2_DEBUG(5390535, 1, @@ -127,17 +147,12 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas return ExecutorFuture<void>(**executor) .then([this, executor, token, anchor = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - const auto coorName = - DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType()); - - auto distLockManager = DistLockManager::get(opCtx); - auto dbDistLock = uassertStatusOK(distLockManager->lock( - opCtx, nss().db(), coorName, DistLockManager::kDefaultLockTimeout)); - _scopedLocks.emplace(dbDistLock.moveToAnotherThread()); - - if (!nss().isConfigDB() && !_coorMetadata.getRecoveredFromDisk()) { + return _acquireLockAsync(executor, token, nss().db()); + }) + .then([this, executor, token, anchor = shared_from_this()] { + if (!nss().isConfigDB() && !_recoveredFromDisk) { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); invariant(_coorMetadata.getDatabaseVersion()); OperationShardingState::get(opCtx).initializeClientRoutingVersions( @@ -145,22 +160,27 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas // Check under the dbLock if this is still the primary shard for the database DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss().db()); }; - + }) + .then([this, executor, token, anchor = shared_from_this()] { if (!nss().coll().empty()) { - auto collDistLock = uassertStatusOK(distLockManager->lock( - opCtx, nss().ns(), coorName, DistLockManager::kDefaultLockTimeout)); - _scopedLocks.emplace(collDistLock.moveToAnotherThread()); + return _acquireLockAsync(executor, token, nss().ns()); } - - for (auto& lock : _acquireAdditionalLocks(opCtx)) { - _scopedLocks.emplace(lock.moveToAnotherThread()); + return ExecutorFuture<void>(**executor); + }) + .then([this, executor, token, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + auto additionalLocks = _acquireAdditionalLocks(opCtx); + if (!additionalLocks.empty()) { + invariant(additionalLocks.size() == 1); + return _acquireLockAsync(executor, token, additionalLocks.front()); } - - { - stdx::lock_guard<Latch> lg(_mutex); - if (!_constructionCompletionPromise.getFuture().isReady()) { - _constructionCompletionPromise.emplaceValue(); - } + return ExecutorFuture<void>(**executor); + }) + .then([this, anchor = shared_from_this()] { + stdx::lock_guard<Latch> lg(_mutex); + if (!_constructionCompletionPromise.getFuture().isReady()) { + _constructionCompletionPromise.emplaceValue(); } hangBeforeRunningCoordinatorInstance.pauseWhileSet(); diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index c5f9d0f6ec5..07397e7c4dc 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -90,13 +90,14 @@ public: } protected: - virtual std::vector<DistLockManager::ScopedDistLock> _acquireAdditionalLocks( - OperationContext* opCtx) { + virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) { return {}; }; ShardingDDLCoordinatorService* _service; ShardingDDLCoordinatorMetadata _coorMetadata; + StringData _coorName; + bool _recoveredFromDisk; bool _completeOnError{false}; @@ -110,6 +111,9 @@ private: void interrupt(Status status) override final; bool _removeDocument(OperationContext* opCtx); + ExecutorFuture<void> _acquireLockAsync(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token, + StringData resource); Mutex _mutex = MONGO_MAKE_LATCH("ShardingDDLCoordinator::_mutex"); SharedPromise<void> _constructionCompletionPromise; |