diff options
author | Tommaso Tocci <tommaso.tocci@mongodb.com> | 2021-02-15 13:17:06 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-24 21:09:28 +0000 |
commit | 5f06484704079c88b81d7ec6ba9a223c651b1a2e (patch) | |
tree | 20979701f24ac994f4257647484a9ff8d6c5ad6e | |
parent | 0949e167fe3b54f06117c9797e760d4cd2cb6b8b (diff) | |
download | mongo-5f06484704079c88b81d7ec6ba9a223c651b1a2e.tar.gz |
SERVER-53905 Implement PrimaryOnlyService for DDL coordinators
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service.h | 18 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/database_sharding_state.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/s/database_sharding_state.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/dist_lock_manager.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/dist_lock_manager.h | 1 | ||||
-rw-r--r-- | src/mongo/db/s/forwardable_operation_metadata.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/s/forwardable_operation_metadata.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/forwardable_operation_metadata.idl | 1 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.cpp | 88 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.h | 56 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.idl | 92 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator_service.cpp | 113 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator_service.h | 65 |
19 files changed, 490 insertions, 36 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index da599c0ce21..c274ff9df4d 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2192,6 +2192,7 @@ env.Library( 'repl/topology_coordinator', 'repl/wait_for_majority_service', 's/sessions_collection_config_server', + 's/sharding_commands_d', 's/sharding_runtime_d', 'serverinit', 'service_context_d', diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 76c491e3016..2da17484e11 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -143,6 +143,7 @@ #include "mongo/db/s/resharding/resharding_op_observer.h" #include "mongo/db/s/resharding/resharding_recipient_service.h" #include "mongo/db/s/shard_server_op_observer.h" +#include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/s/transaction_coordinator_service.h" @@ -317,6 +318,7 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { services.push_back(std::make_unique<ReshardingCoordinatorService>(serviceContext)); } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + services.push_back(std::make_unique<ShardingDDLCoordinatorService>(serviceContext)); services.push_back(std::make_unique<ReshardingDonorService>(serviceContext)); services.push_back(std::make_unique<ReshardingRecipientService>(serviceContext)); } diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 0d59ea139ec..4e6e598622f 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -102,6 +102,9 @@ const NamespaceString NamespaceString::kDonorReshardingOperationsNamespace( const NamespaceString NamespaceString::kRecipientReshardingOperationsNamespace( NamespaceString::kConfigDb, "localReshardingOperations.recipient"); +const NamespaceString NamespaceString::kShardingDDLCoordinatorsNamespace( + NamespaceString::kConfigDb, "system.sharding_ddl_coordinators"); + const NamespaceString NamespaceString::kConfigSettingsNamespace(NamespaceString::kConfigDb, "settings"); const NamespaceString NamespaceString::kVectorClockNamespace(NamespaceString::kConfigDb, @@ -140,6 +143,8 @@ bool NamespaceString::isLegalClientSystemNS() const { return true; if (coll().find(".system.resharding.") != std::string::npos) return true; + if (coll() == kShardingDDLCoordinatorsNamespace.coll()) + return true; } else if (db() == kLocalDb) { if (coll() == kSystemReplSetNamespace.coll()) return true; diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 6e4c050948c..02265c9641a 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -150,6 +150,9 @@ public: // Namespace for the recipient shard's local resharding operation state. static const NamespaceString kRecipientReshardingOperationsNamespace; + // Namespace for persisting sharding DDL coordinators state documents + static const NamespaceString kShardingDDLCoordinatorsNamespace; + // Namespace for balancer settings and default read and write concerns. static const NamespaceString kConfigSettingsNamespace; diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index f7cb5a53119..ddfab769e8c 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -482,8 +482,8 @@ void PrimaryOnlyService::shutdown() { savedInstances.clear(); } -std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateInstance( - OperationContext* opCtx, BSONObj initialState) { +std::pair<std::shared_ptr<PrimaryOnlyService::Instance>, bool> +PrimaryOnlyService::getOrCreateInstance(OperationContext* opCtx, BSONObj initialState) { const auto idElem = initialState["_id"]; uassert(4908702, str::stream() << "Missing _id element when adding new instance of PrimaryOnlyService \"" @@ -505,7 +505,7 @@ std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateIns auto it = _instances.find(instanceID); if (it != _instances.end()) { - return it->second; + return {it->second, false}; } auto [it2, inserted] = _instances.emplace(instanceID, constructInstance(std::move(initialState))); @@ -514,7 +514,7 @@ std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateIns // Kick off async work to run the instance _scheduleRun(lk, it2->second, instanceID); - return it2->second; + return {it2->second, true}; } boost::optional<std::shared_ptr<PrimaryOnlyService::Instance>> PrimaryOnlyService::lookupInstance( diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h index 802fc9050f4..db8c4a51f45 100644 --- a/src/mongo/db/repl/primary_only_service.h +++ b/src/mongo/db/repl/primary_only_service.h @@ -188,7 +188,7 @@ public: static std::shared_ptr<InstanceType> getOrCreate(OperationContext* opCtx, PrimaryOnlyService* service, BSONObj initialState) { - auto instance = service->getOrCreateInstance(opCtx, std::move(initialState)); + auto [instance, _] = service->getOrCreateInstance(opCtx, std::move(initialState)); return checked_pointer_cast<InstanceType>(instance); } }; @@ -326,13 +326,19 @@ protected: /** * Extracts an InstanceID from the _id field of the given 'initialState' object. If an Instance - * with the extracted InstanceID already exists in _instances, returns it. If not, constructs a - * new Instance (by calling constructInstance()), registers it in _instances, and returns it. - * It is illegal to call this more than once with 'initialState' documents that have the same - * _id but are otherwise not completely identical. + * with the extracted InstanceID already exists in _instances, returns true and the instance + * itself. If not, constructs a new Instance (by calling constructInstance()), registers it in + * _instances, and returns it with the boolean set to false. It is illegal to call this more + * than once with 'initialState' documents that have the same _id but are otherwise not + * completely identical. + * + * Returns a pair with an Instance and a boolean, the boolean indicates if the Instance have + * been created in this invocation (true) or already existed (false). + * * Throws NotWritablePrimary if the node is not currently primary. */ - std::shared_ptr<Instance> getOrCreateInstance(OperationContext* opCtx, BSONObj initialState); + std::pair<std::shared_ptr<Instance>, bool> getOrCreateInstance(OperationContext* opCtx, + BSONObj initialState); /** * Since, scoped task executor shuts down on stepdown, we might need to run some instance work, diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 9ea9e0f53d2..00f970bb141 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -349,6 +349,8 @@ env.Library( 'reshard_collection_coordinator.cpp', 'set_shard_version_command.cpp', 'sharding_ddl_coordinator.cpp', + 'sharding_ddl_coordinator.idl', + 'sharding_ddl_coordinator_service.cpp', 'sharding_server_status.cpp', 'sharding_state_command.cpp', 'shardsvr_create_collection_command.cpp', diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp index fd27fa0f6e5..227a561711d 100644 --- a/src/mongo/db/s/database_sharding_state.cpp +++ b/src/mongo/db/s/database_sharding_state.cpp @@ -35,6 +35,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" #include "mongo/s/database_version.h" #include "mongo/s/stale_exception.h" @@ -90,6 +91,29 @@ DatabaseShardingState* DatabaseShardingState::get(OperationContext* opCtx, return databasesMap.getOrCreate(dbName).get(); } +void DatabaseShardingState::checkIsPrimaryShardForDb(OperationContext* opCtx, StringData dbName) { + invariant(dbName != NamespaceString::kConfigDb); + + uassert(ErrorCodes::IllegalOperation, + "Request sent without attaching database version", + OperationShardingState::get(opCtx).hasDbVersion()); + + const auto dbPrimaryShardId = [&]() { + Lock::DBLock dbWriteLock(opCtx, dbName, MODE_IS); + auto dss = DatabaseShardingState::get(opCtx, dbName); + auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss); + // The following call will also ensure that the database version matches + return dss->getDatabaseInfo(opCtx, dssLock).getPrimary(); + }(); + + const auto thisShardId = ShardingState::get(opCtx)->shardId(); + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "This is not the primary shard for db " << dbName + << " expected: " << dbPrimaryShardId << " shardId: " << thisShardId, + dbPrimaryShardId == thisShardId); +} + std::shared_ptr<DatabaseShardingState> DatabaseShardingState::getSharedForLockFreeReads( OperationContext* opCtx, const StringData dbName) { auto& databasesMap = DatabaseShardingStateMap::get(opCtx->getServiceContext()); diff --git a/src/mongo/db/s/database_sharding_state.h b/src/mongo/db/s/database_sharding_state.h index 836570e67ef..b1624bbd9b8 100644 --- a/src/mongo/db/s/database_sharding_state.h +++ b/src/mongo/db/s/database_sharding_state.h @@ -73,6 +73,16 @@ public: OperationContext* opCtx, const StringData dbName); /** + * Checks if this shard is the primary shard for the given DB. + * + * Throws an IllegalOperation exception otherwise. + * + * Assumes the operation context has a DB version attached to it for the given @dbName. + */ + static void checkIsPrimaryShardForDb(OperationContext* opCtx, StringData dbName); + + + /** * Methods to control the databases's critical section. Must be called with the database X lock * held. */ diff --git a/src/mongo/db/s/dist_lock_manager.cpp b/src/mongo/db/s/dist_lock_manager.cpp index 39c08cc1dd7..ecbc04eb9ef 100644 --- a/src/mongo/db/s/dist_lock_manager.cpp +++ b/src/mongo/db/s/dist_lock_manager.cpp @@ -76,6 +76,11 @@ DistLockManager::ScopedDistLock DistLockManager::ScopedDistLock::moveToAnotherTh return unownedScopedDistLock; } +void DistLockManager::ScopedDistLock::assignNewOpCtx(OperationContext* opCtx) { + invariant(!_opCtx); + _opCtx = opCtx; +} + DistLockManager::DistLockManager(OID lockSessionID) : _lockSessionID(std::move(lockSessionID)) {} DistLockManager* DistLockManager::get(ServiceContext* service) { diff --git a/src/mongo/db/s/dist_lock_manager.h b/src/mongo/db/s/dist_lock_manager.h index 953cc76d9b9..46f7cf58c11 100644 --- a/src/mongo/db/s/dist_lock_manager.h +++ b/src/mongo/db/s/dist_lock_manager.h @@ -102,6 +102,7 @@ public: ScopedDistLock(ScopedDistLock&& other); ScopedDistLock moveToAnotherThread(); + void assignNewOpCtx(OperationContext* opCtx); private: OperationContext* _opCtx; diff --git a/src/mongo/db/s/forwardable_operation_metadata.cpp b/src/mongo/db/s/forwardable_operation_metadata.cpp index 066b1426388..39ed1fbf62c 100644 --- a/src/mongo/db/s/forwardable_operation_metadata.cpp +++ b/src/mongo/db/s/forwardable_operation_metadata.cpp @@ -35,26 +35,34 @@ namespace mongo { +ForwardableOperationMetadata::ForwardableOperationMetadata(const BSONObj& obj) { + ForwardableOperationMetadataBase::parse( + IDLParserErrorContext("ForwardableOperationMetadataBase"), obj); +} + ForwardableOperationMetadata::ForwardableOperationMetadata(OperationContext* opCtx) { if (auto optComment = opCtx->getComment()) { setComment(optComment->wrap()); } auto authzSession = AuthorizationSession::get(opCtx->getClient()); - setImpersonatedUserMetadata({userNameIteratorToContainer<std::vector<UserName>>( - authzSession->getImpersonatedUserNames()), - roleNameIteratorToContainer<std::vector<RoleName>>( - authzSession->getImpersonatedRoleNames())}); + setImpersonatedUserMetadata({{userNameIteratorToContainer<std::vector<UserName>>( + authzSession->getImpersonatedUserNames()), + roleNameIteratorToContainer<std::vector<RoleName>>( + authzSession->getImpersonatedRoleNames())}}); } -void ForwardableOperationMetadata::setOn(OperationContext* opCtx) { +void ForwardableOperationMetadata::setOn(OperationContext* opCtx) const { if (const auto& comment = getComment()) { opCtx->setComment(comment.get()); } - const auto& authMetadata = getImpersonatedUserMetadata(); - if (!authMetadata.getUsers().empty() || !authMetadata.getRoles().empty()) - AuthorizationSession::get(opCtx->getClient()) - ->setImpersonatedUserData(authMetadata.getUsers(), authMetadata.getRoles()); + if (const auto& optAuthMetadata = getImpersonatedUserMetadata()) { + const auto& authMetadata = optAuthMetadata.get(); + if (!authMetadata.getUsers().empty() || !authMetadata.getRoles().empty()) { + AuthorizationSession::get(opCtx->getClient()) + ->setImpersonatedUserData(authMetadata.getUsers(), authMetadata.getRoles()); + } + } } } // namespace mongo diff --git a/src/mongo/db/s/forwardable_operation_metadata.h b/src/mongo/db/s/forwardable_operation_metadata.h index 884233f7ce6..8cae3a2a636 100644 --- a/src/mongo/db/s/forwardable_operation_metadata.h +++ b/src/mongo/db/s/forwardable_operation_metadata.h @@ -46,8 +46,12 @@ namespace mongo { */ class ForwardableOperationMetadata : public ForwardableOperationMetadataBase { public: + ForwardableOperationMetadata() = default; + ForwardableOperationMetadata(const BSONObj& obj); ForwardableOperationMetadata(OperationContext* opCtx); - void setOn(OperationContext* opCtx); + + + void setOn(OperationContext* opCtx) const; }; } // namespace mongo diff --git a/src/mongo/db/s/forwardable_operation_metadata.idl b/src/mongo/db/s/forwardable_operation_metadata.idl index 3ed3d310f8e..28b5afbb970 100644 --- a/src/mongo/db/s/forwardable_operation_metadata.idl +++ b/src/mongo/db/s/forwardable_operation_metadata.idl @@ -45,3 +45,4 @@ structs: impersonatedUserMetadata: type: ImpersonatedUserMetadata description: "A struct representing the impersonated users from a mongos" + optional: true diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index 50fab719358..9fbf3eceffe 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -35,14 +35,83 @@ #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_ddl_coordinator_gen.h" +#include "mongo/logv2/log.h" #include "mongo/s/grid.h" namespace mongo { +ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONObj& coorDoc) { + return ShardingDDLCoordinatorMetadata::parse( + IDLParserErrorContext("ShardingDDLCoordinatorMetadata"), coorDoc); +} + +ShardingDDLCoordinator::ShardingDDLCoordinator(const BSONObj& coorDoc) + : _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)) {} + +ShardingDDLCoordinator::~ShardingDDLCoordinator() { + invariant(_constructionCompletionPromise.getFuture().isReady()); +} + +SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancelationToken& token) noexcept { + return ExecutorFuture<void>(**executor) + .then([this, executor, token, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + const auto coorName = + DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType()); + + auto distLockManager = DistLockManager::get(opCtx); + auto dbDistLock = uassertStatusOK(distLockManager->lock( + opCtx, nss().db(), coorName, DistLockManager::kDefaultLockTimeout)); + _scopedLocks.emplace(dbDistLock.moveToAnotherThread()); + + if (!nss().isConfigDB() && !_coorMetadata.getRecoveredFromDisk()) { + invariant(_coorMetadata.getDatabaseVersion()); + + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + nss(), boost::none /* ChunkVersion */, _coorMetadata.getDatabaseVersion()); + // Check under the dbLock if this is still the primary shard for the database + DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss().db()); + }; + + if (!nss().ns().empty()) { + auto collDistLock = uassertStatusOK(distLockManager->lock( + opCtx, nss().ns(), coorName, DistLockManager::kDefaultLockTimeout)); + _scopedLocks.emplace(collDistLock.moveToAnotherThread()); + } + + _constructionCompletionPromise.emplaceValue(); + }) + .onError([this, anchor = shared_from_this()](const Status& status) { + LOGV2_ERROR(5390530, + "Failed to complete construction of sharding DDL coordinator", + "coordinatorId"_attr = _coorMetadata.getId(), + "reason"_attr = redact(status)); + _constructionCompletionPromise.setError( + status.withContext("Failed to complete construction of sharding DDL coordinator")); + return status; + }) + .then([this, executor, token, anchor = shared_from_this()] { + return _runImpl(executor, token); + }) + .onCompletion([this, anchor = shared_from_this()](const Status& status) { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + + while (!_scopedLocks.empty()) { + _scopedLocks.top().assignNewOpCtx(opCtx); + _scopedLocks.pop(); + } + return status; + }) + .semi(); +} + ShardingDDLCoordinator_NORESILIENT::ShardingDDLCoordinator_NORESILIENT(OperationContext* opCtx, const NamespaceString& ns) - : _nss(ns), _forwardableOpMetadata(opCtx){}; + : _nss(ns), _forwardableOpMetadata(opCtx) {} SemiFuture<void> ShardingDDLCoordinator_NORESILIENT::run(OperationContext* opCtx) { if (!_nss.isConfigDB()) { @@ -53,20 +122,7 @@ SemiFuture<void> ShardingDDLCoordinator_NORESILIENT::run(OperationContext* opCtx clientDbVersion); // Checks that this is the primary shard for the namespace's db - const auto dbPrimaryShardId = [&]() { - Lock::DBLock dbWriteLock(opCtx, _nss.db(), MODE_IS); - auto dss = DatabaseShardingState::get(opCtx, _nss.db()); - auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss); - // The following call will also ensure that the database version matches - return dss->getDatabaseInfo(opCtx, dssLock).getPrimary(); - }(); - - const auto thisShardId = ShardingState::get(opCtx)->shardId(); - - uassert(ErrorCodes::IllegalOperation, - str::stream() << "This is not the primary shard for db " << _nss.db() - << " expected: " << dbPrimaryShardId << " shardId: " << thisShardId, - dbPrimaryShardId == thisShardId); + DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, _nss.db()); } return runImpl(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); } diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index 4fd60804dd2..cb0117dd9f8 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -31,12 +31,68 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/s/dist_lock_manager.h" #include "mongo/db/s/forwardable_operation_metadata.h" +#include "mongo/db/s/sharding_ddl_coordinator_gen.h" #include "mongo/executor/task_executor.h" #include "mongo/util/future.h" namespace mongo { +ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONObj& coorDoc); + +class ShardingDDLCoordinator + : public repl::PrimaryOnlyService::TypedInstance<ShardingDDLCoordinator> { +public: + explicit ShardingDDLCoordinator(const BSONObj& coorDoc); + + ~ShardingDDLCoordinator(); + + /* + * Check if the given coordinator document has the same options as this. + * + * This is used to decide if we can join a previously created coordinator. + * In the case the given coordinator document has incompatible options with this, + * this function must throw a ConflictingOprationInProgress exception with an adequate message. + */ + virtual void checkIfOptionsConflict(const BSONObj& coorDoc) const = 0; + + + /* + * Returns a future that will be completed when the construction of this coordinator instance + * is completed. + * + * In particular the returned future will be ready only after this coordinator succesfully + * aquires the required locks. + */ + SharedSemiFuture<void> getConstructionCompletionFuture() { + return _constructionCompletionPromise.getFuture(); + } + + const NamespaceString& nss() const { + return _coorMetadata.getId().getNss(); + } + + const ForwardableOperationMetadata& getForwardableOpMetadata() const { + invariant(_coorMetadata.getForwardableOpMetadata()); + return _coorMetadata.getForwardableOpMetadata().get(); + } + +protected: + ShardingDDLCoordinatorMetadata _coorMetadata; + std::stack<DistLockManager::ScopedDistLock> _scopedLocks; + +private: + SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancelationToken& token) noexcept override final; + + virtual ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancelationToken& token) noexcept = 0; + + SharedPromise<void> _constructionCompletionPromise; +}; + class ShardingDDLCoordinator_NORESILIENT { public: ShardingDDLCoordinator_NORESILIENT(OperationContext* opCtx, const NamespaceString& nss); diff --git a/src/mongo/db/s/sharding_ddl_coordinator.idl b/src/mongo/db/s/sharding_ddl_coordinator.idl new file mode 100644 index 00000000000..6dd43c89303 --- /dev/null +++ b/src/mongo/db/s/sharding_ddl_coordinator.idl @@ -0,0 +1,92 @@ +# Copyright (C) 2021-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This file defines the format of documents stored in config.DDL.Operations. + +global: + cpp_namespace: "mongo" + cpp_includes: + - "mongo/db/s/forwardable_operation_metadata.h" + - "mongo/s/database_version.h" + +imports: + - "mongo/idl/basic_types.idl" + +enums: + DDLCoordinatorType: + description: "Type of the sharding DDL Operation." + type: string + values: + kDropCollection: "dropCollection" + +types: + ForwardableOperationMetadata: + description: "Forwardable operation metadata assiociated with this coordinator." + cpp_type: ForwardableOperationMetadata + bson_serialization_type: object + serializer: "mongo::ForwardableOperationMetadata::toBSON" + deserializer: "mongo::ForwardableOperationMetadata" + DatabaseVersion: + description: "" + cpp_type: DatabaseVersion + bson_serialization_type: object + serializer: "mongo::DatabaseVersion::toBSON" + deserializer: "mongo::DatabaseVersion" + +structs: + ShardingDDLCoordinatorId: + description: "Identifier for a specific sharding DDL Coordinator." + generate_comparison_operators: false + strict: false + fields: + namespace: + description: "The target namespace of the DDL operation." + cpp_name: nss + type: namespacestring + operationType: + description: "Type of the sharding DDL coordinator." + type: DDLCoordinatorType + + ShardingDDLCoordinatorMetadata: + description: "Commong metadata for all sharding DDL coordinator." + generate_comparison_operators: false + strict: false + fields: + _id: + type: ShardingDDLCoordinatorId + cpp_name: id + recoveredFromDisk: + type: bool + default: false + forwardableOpMetadata: + type: ForwardableOperationMetadata + optional: true + databaseVersion: + type: DatabaseVersion + optional: true + diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp new file mode 100644 index 00000000000..f94793638e5 --- /dev/null +++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_ddl_coordinator_service.h" + +#include "mongo/base/checked_cast.h" +#include "mongo/db/s/database_sharding_state.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/sharding_ddl_coordinator.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +ShardingDDLCoordinatorService* ShardingDDLCoordinatorService::getService(OperationContext* opCtx) { + auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); + auto service = registry->lookupServiceByName(kServiceName); + return checked_cast<ShardingDDLCoordinatorService*>(std::move(service)); +} + +std::shared_ptr<ShardingDDLCoordinatorService::Instance> +ShardingDDLCoordinatorService::constructInstance(BSONObj initialState) const { + const auto op = extractShardingDDLCoordinatorMetadata(initialState); + LOGV2_DEBUG(5390510, + 2, + "Constructing new sharding DDL coordinator", + "coordinatorDoc"_attr = op.toBSON()); + switch (op.getId().getOperationType()) { + case DDLCoordinatorTypeEnum::kDropCollection: + // Switch with only default case are not supported on windows compiler + return {}; + break; + default: + uasserted(ErrorCodes::BadValue, + str::stream() + << "Encountered unknown Sharding DDL operation type: " + << DDLCoordinatorType_serializer(op.getId().getOperationType())); + } +} + +std::shared_ptr<ShardingDDLCoordinatorService::Instance> +ShardingDDLCoordinatorService::getOrCreateInstance(OperationContext* opCtx, BSONObj coorDoc) { + auto coorMetadata = extractShardingDDLCoordinatorMetadata(coorDoc); + const auto& nss = coorMetadata.getId().getNss(); + + if (!nss.isConfigDB()) { + // Check that the operation context has a database version for this namespace + const auto clientDbVersion = OperationShardingState::get(opCtx).getDbVersion(nss.db()); + uassert(ErrorCodes::IllegalOperation, + "Request sent without attaching database version", + clientDbVersion); + DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss.db()); + coorMetadata.setDatabaseVersion(clientDbVersion); + } + + coorMetadata.setForwardableOpMetadata({{opCtx}}); + const auto patchedCoorDoc = coorDoc.addFields(coorMetadata.toBSON()); + + auto [coordinator, created] = [&] { + try { + auto [coordinator, created] = + PrimaryOnlyService::getOrCreateInstance(opCtx, patchedCoorDoc); + return std::make_pair( + checked_pointer_cast<ShardingDDLCoordinator>(std::move(coordinator)), + std::move(created)); + } catch (const DBException& ex) { + LOGV2_ERROR(5390512, + "Failed to create instance of sharding DDL coordinator", + "coordinatorId"_attr = coorMetadata.getId(), + "reason"_attr = redact(ex)); + throw; + } + }(); + + // If the existing instance doesn't have conflicting options just return that one + if (!created) { + coordinator->checkIfOptionsConflict(coorDoc); + } + + coordinator->getConstructionCompletionFuture().get(opCtx); + return coordinator; +} + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.h b/src/mongo/db/s/sharding_ddl_coordinator_service.h new file mode 100644 index 00000000000..c514ca659c9 --- /dev/null +++ b/src/mongo/db/s/sharding_ddl_coordinator_service.h @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/primary_only_service.h" + +namespace mongo { + +class ShardingDDLCoordinatorService final : public repl::PrimaryOnlyService { +public: + static constexpr StringData kServiceName = "ShardingDDLCoordinator"_sd; + + explicit ShardingDDLCoordinatorService(ServiceContext* serviceContext) + : PrimaryOnlyService(serviceContext) {} + + ~ShardingDDLCoordinatorService() = default; + + static ShardingDDLCoordinatorService* getService(OperationContext* opCtx); + + StringData getServiceName() const override { + return kServiceName; + } + + NamespaceString getStateDocumentsNS() const override { + return NamespaceString::kShardingDDLCoordinatorsNamespace; + } + + ThreadPool::Limits getThreadPoolLimits() const override { + return ThreadPool::Limits(); + } + + std::shared_ptr<Instance> constructInstance(BSONObj initialState) const override; + + std::shared_ptr<Instance> getOrCreateInstance(OperationContext* opCtx, BSONObj initialState); +}; + +} // namespace mongo |