summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2021-02-15 13:17:06 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-24 21:09:28 +0000
commit5f06484704079c88b81d7ec6ba9a223c651b1a2e (patch)
tree20979701f24ac994f4257647484a9ff8d6c5ad6e
parent0949e167fe3b54f06117c9797e760d4cd2cb6b8b (diff)
downloadmongo-5f06484704079c88b81d7ec6ba9a223c651b1a2e.tar.gz
SERVER-53905 Implement PrimaryOnlyService for DDL coordinators
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/mongod_main.cpp2
-rw-r--r--src/mongo/db/namespace_string.cpp5
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/repl/primary_only_service.cpp8
-rw-r--r--src/mongo/db/repl/primary_only_service.h18
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/database_sharding_state.cpp24
-rw-r--r--src/mongo/db/s/database_sharding_state.h10
-rw-r--r--src/mongo/db/s/dist_lock_manager.cpp5
-rw-r--r--src/mongo/db/s/dist_lock_manager.h1
-rw-r--r--src/mongo/db/s/forwardable_operation_metadata.cpp26
-rw-r--r--src/mongo/db/s/forwardable_operation_metadata.h6
-rw-r--r--src/mongo/db/s/forwardable_operation_metadata.idl1
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.cpp88
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.h56
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.idl92
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator_service.cpp113
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator_service.h65
19 files changed, 490 insertions, 36 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index da599c0ce21..c274ff9df4d 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2192,6 +2192,7 @@ env.Library(
'repl/topology_coordinator',
'repl/wait_for_majority_service',
's/sessions_collection_config_server',
+ 's/sharding_commands_d',
's/sharding_runtime_d',
'serverinit',
'service_context_d',
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 76c491e3016..2da17484e11 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -143,6 +143,7 @@
#include "mongo/db/s/resharding/resharding_op_observer.h"
#include "mongo/db/s/resharding/resharding_recipient_service.h"
#include "mongo/db/s/shard_server_op_observer.h"
+#include "mongo/db/s/sharding_ddl_coordinator_service.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/db/s/transaction_coordinator_service.h"
@@ -317,6 +318,7 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) {
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
services.push_back(std::make_unique<ReshardingCoordinatorService>(serviceContext));
} else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
+ services.push_back(std::make_unique<ShardingDDLCoordinatorService>(serviceContext));
services.push_back(std::make_unique<ReshardingDonorService>(serviceContext));
services.push_back(std::make_unique<ReshardingRecipientService>(serviceContext));
}
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 0d59ea139ec..4e6e598622f 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -102,6 +102,9 @@ const NamespaceString NamespaceString::kDonorReshardingOperationsNamespace(
const NamespaceString NamespaceString::kRecipientReshardingOperationsNamespace(
NamespaceString::kConfigDb, "localReshardingOperations.recipient");
+const NamespaceString NamespaceString::kShardingDDLCoordinatorsNamespace(
+ NamespaceString::kConfigDb, "system.sharding_ddl_coordinators");
+
const NamespaceString NamespaceString::kConfigSettingsNamespace(NamespaceString::kConfigDb,
"settings");
const NamespaceString NamespaceString::kVectorClockNamespace(NamespaceString::kConfigDb,
@@ -140,6 +143,8 @@ bool NamespaceString::isLegalClientSystemNS() const {
return true;
if (coll().find(".system.resharding.") != std::string::npos)
return true;
+ if (coll() == kShardingDDLCoordinatorsNamespace.coll())
+ return true;
} else if (db() == kLocalDb) {
if (coll() == kSystemReplSetNamespace.coll())
return true;
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 6e4c050948c..02265c9641a 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -150,6 +150,9 @@ public:
// Namespace for the recipient shard's local resharding operation state.
static const NamespaceString kRecipientReshardingOperationsNamespace;
+ // Namespace for persisting sharding DDL coordinators state documents
+ static const NamespaceString kShardingDDLCoordinatorsNamespace;
+
// Namespace for balancer settings and default read and write concerns.
static const NamespaceString kConfigSettingsNamespace;
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp
index f7cb5a53119..ddfab769e8c 100644
--- a/src/mongo/db/repl/primary_only_service.cpp
+++ b/src/mongo/db/repl/primary_only_service.cpp
@@ -482,8 +482,8 @@ void PrimaryOnlyService::shutdown() {
savedInstances.clear();
}
-std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateInstance(
- OperationContext* opCtx, BSONObj initialState) {
+std::pair<std::shared_ptr<PrimaryOnlyService::Instance>, bool>
+PrimaryOnlyService::getOrCreateInstance(OperationContext* opCtx, BSONObj initialState) {
const auto idElem = initialState["_id"];
uassert(4908702,
str::stream() << "Missing _id element when adding new instance of PrimaryOnlyService \""
@@ -505,7 +505,7 @@ std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateIns
auto it = _instances.find(instanceID);
if (it != _instances.end()) {
- return it->second;
+ return {it->second, false};
}
auto [it2, inserted] =
_instances.emplace(instanceID, constructInstance(std::move(initialState)));
@@ -514,7 +514,7 @@ std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateIns
// Kick off async work to run the instance
_scheduleRun(lk, it2->second, instanceID);
- return it2->second;
+ return {it2->second, true};
}
boost::optional<std::shared_ptr<PrimaryOnlyService::Instance>> PrimaryOnlyService::lookupInstance(
diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h
index 802fc9050f4..db8c4a51f45 100644
--- a/src/mongo/db/repl/primary_only_service.h
+++ b/src/mongo/db/repl/primary_only_service.h
@@ -188,7 +188,7 @@ public:
static std::shared_ptr<InstanceType> getOrCreate(OperationContext* opCtx,
PrimaryOnlyService* service,
BSONObj initialState) {
- auto instance = service->getOrCreateInstance(opCtx, std::move(initialState));
+ auto [instance, _] = service->getOrCreateInstance(opCtx, std::move(initialState));
return checked_pointer_cast<InstanceType>(instance);
}
};
@@ -326,13 +326,19 @@ protected:
/**
* Extracts an InstanceID from the _id field of the given 'initialState' object. If an Instance
- * with the extracted InstanceID already exists in _instances, returns it. If not, constructs a
- * new Instance (by calling constructInstance()), registers it in _instances, and returns it.
- * It is illegal to call this more than once with 'initialState' documents that have the same
- * _id but are otherwise not completely identical.
+ * with the extracted InstanceID already exists in _instances, returns true and the instance
+ * itself. If not, constructs a new Instance (by calling constructInstance()), registers it in
+ * _instances, and returns it with the boolean set to false. It is illegal to call this more
+ * than once with 'initialState' documents that have the same _id but are otherwise not
+ * completely identical.
+ *
+ * Returns a pair with an Instance and a boolean, the boolean indicates if the Instance have
+ * been created in this invocation (true) or already existed (false).
+ *
* Throws NotWritablePrimary if the node is not currently primary.
*/
- std::shared_ptr<Instance> getOrCreateInstance(OperationContext* opCtx, BSONObj initialState);
+ std::pair<std::shared_ptr<Instance>, bool> getOrCreateInstance(OperationContext* opCtx,
+ BSONObj initialState);
/**
* Since, scoped task executor shuts down on stepdown, we might need to run some instance work,
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 9ea9e0f53d2..00f970bb141 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -349,6 +349,8 @@ env.Library(
'reshard_collection_coordinator.cpp',
'set_shard_version_command.cpp',
'sharding_ddl_coordinator.cpp',
+ 'sharding_ddl_coordinator.idl',
+ 'sharding_ddl_coordinator_service.cpp',
'sharding_server_status.cpp',
'sharding_state_command.cpp',
'shardsvr_create_collection_command.cpp',
diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp
index fd27fa0f6e5..227a561711d 100644
--- a/src/mongo/db/s/database_sharding_state.cpp
+++ b/src/mongo/db/s/database_sharding_state.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/logv2/log.h"
#include "mongo/s/database_version.h"
#include "mongo/s/stale_exception.h"
@@ -90,6 +91,29 @@ DatabaseShardingState* DatabaseShardingState::get(OperationContext* opCtx,
return databasesMap.getOrCreate(dbName).get();
}
+void DatabaseShardingState::checkIsPrimaryShardForDb(OperationContext* opCtx, StringData dbName) {
+ invariant(dbName != NamespaceString::kConfigDb);
+
+ uassert(ErrorCodes::IllegalOperation,
+ "Request sent without attaching database version",
+ OperationShardingState::get(opCtx).hasDbVersion());
+
+ const auto dbPrimaryShardId = [&]() {
+ Lock::DBLock dbWriteLock(opCtx, dbName, MODE_IS);
+ auto dss = DatabaseShardingState::get(opCtx, dbName);
+ auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss);
+ // The following call will also ensure that the database version matches
+ return dss->getDatabaseInfo(opCtx, dssLock).getPrimary();
+ }();
+
+ const auto thisShardId = ShardingState::get(opCtx)->shardId();
+
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "This is not the primary shard for db " << dbName
+ << " expected: " << dbPrimaryShardId << " shardId: " << thisShardId,
+ dbPrimaryShardId == thisShardId);
+}
+
std::shared_ptr<DatabaseShardingState> DatabaseShardingState::getSharedForLockFreeReads(
OperationContext* opCtx, const StringData dbName) {
auto& databasesMap = DatabaseShardingStateMap::get(opCtx->getServiceContext());
diff --git a/src/mongo/db/s/database_sharding_state.h b/src/mongo/db/s/database_sharding_state.h
index 836570e67ef..b1624bbd9b8 100644
--- a/src/mongo/db/s/database_sharding_state.h
+++ b/src/mongo/db/s/database_sharding_state.h
@@ -73,6 +73,16 @@ public:
OperationContext* opCtx, const StringData dbName);
/**
+ * Checks if this shard is the primary shard for the given DB.
+ *
+ * Throws an IllegalOperation exception otherwise.
+ *
+ * Assumes the operation context has a DB version attached to it for the given @dbName.
+ */
+ static void checkIsPrimaryShardForDb(OperationContext* opCtx, StringData dbName);
+
+
+ /**
* Methods to control the databases's critical section. Must be called with the database X lock
* held.
*/
diff --git a/src/mongo/db/s/dist_lock_manager.cpp b/src/mongo/db/s/dist_lock_manager.cpp
index 39c08cc1dd7..ecbc04eb9ef 100644
--- a/src/mongo/db/s/dist_lock_manager.cpp
+++ b/src/mongo/db/s/dist_lock_manager.cpp
@@ -76,6 +76,11 @@ DistLockManager::ScopedDistLock DistLockManager::ScopedDistLock::moveToAnotherTh
return unownedScopedDistLock;
}
+void DistLockManager::ScopedDistLock::assignNewOpCtx(OperationContext* opCtx) {
+ invariant(!_opCtx);
+ _opCtx = opCtx;
+}
+
DistLockManager::DistLockManager(OID lockSessionID) : _lockSessionID(std::move(lockSessionID)) {}
DistLockManager* DistLockManager::get(ServiceContext* service) {
diff --git a/src/mongo/db/s/dist_lock_manager.h b/src/mongo/db/s/dist_lock_manager.h
index 953cc76d9b9..46f7cf58c11 100644
--- a/src/mongo/db/s/dist_lock_manager.h
+++ b/src/mongo/db/s/dist_lock_manager.h
@@ -102,6 +102,7 @@ public:
ScopedDistLock(ScopedDistLock&& other);
ScopedDistLock moveToAnotherThread();
+ void assignNewOpCtx(OperationContext* opCtx);
private:
OperationContext* _opCtx;
diff --git a/src/mongo/db/s/forwardable_operation_metadata.cpp b/src/mongo/db/s/forwardable_operation_metadata.cpp
index 066b1426388..39ed1fbf62c 100644
--- a/src/mongo/db/s/forwardable_operation_metadata.cpp
+++ b/src/mongo/db/s/forwardable_operation_metadata.cpp
@@ -35,26 +35,34 @@
namespace mongo {
+ForwardableOperationMetadata::ForwardableOperationMetadata(const BSONObj& obj) {
+ ForwardableOperationMetadataBase::parse(
+ IDLParserErrorContext("ForwardableOperationMetadataBase"), obj);
+}
+
ForwardableOperationMetadata::ForwardableOperationMetadata(OperationContext* opCtx) {
if (auto optComment = opCtx->getComment()) {
setComment(optComment->wrap());
}
auto authzSession = AuthorizationSession::get(opCtx->getClient());
- setImpersonatedUserMetadata({userNameIteratorToContainer<std::vector<UserName>>(
- authzSession->getImpersonatedUserNames()),
- roleNameIteratorToContainer<std::vector<RoleName>>(
- authzSession->getImpersonatedRoleNames())});
+ setImpersonatedUserMetadata({{userNameIteratorToContainer<std::vector<UserName>>(
+ authzSession->getImpersonatedUserNames()),
+ roleNameIteratorToContainer<std::vector<RoleName>>(
+ authzSession->getImpersonatedRoleNames())}});
}
-void ForwardableOperationMetadata::setOn(OperationContext* opCtx) {
+void ForwardableOperationMetadata::setOn(OperationContext* opCtx) const {
if (const auto& comment = getComment()) {
opCtx->setComment(comment.get());
}
- const auto& authMetadata = getImpersonatedUserMetadata();
- if (!authMetadata.getUsers().empty() || !authMetadata.getRoles().empty())
- AuthorizationSession::get(opCtx->getClient())
- ->setImpersonatedUserData(authMetadata.getUsers(), authMetadata.getRoles());
+ if (const auto& optAuthMetadata = getImpersonatedUserMetadata()) {
+ const auto& authMetadata = optAuthMetadata.get();
+ if (!authMetadata.getUsers().empty() || !authMetadata.getRoles().empty()) {
+ AuthorizationSession::get(opCtx->getClient())
+ ->setImpersonatedUserData(authMetadata.getUsers(), authMetadata.getRoles());
+ }
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/s/forwardable_operation_metadata.h b/src/mongo/db/s/forwardable_operation_metadata.h
index 884233f7ce6..8cae3a2a636 100644
--- a/src/mongo/db/s/forwardable_operation_metadata.h
+++ b/src/mongo/db/s/forwardable_operation_metadata.h
@@ -46,8 +46,12 @@ namespace mongo {
*/
class ForwardableOperationMetadata : public ForwardableOperationMetadataBase {
public:
+ ForwardableOperationMetadata() = default;
+ ForwardableOperationMetadata(const BSONObj& obj);
ForwardableOperationMetadata(OperationContext* opCtx);
- void setOn(OperationContext* opCtx);
+
+
+ void setOn(OperationContext* opCtx) const;
};
} // namespace mongo
diff --git a/src/mongo/db/s/forwardable_operation_metadata.idl b/src/mongo/db/s/forwardable_operation_metadata.idl
index 3ed3d310f8e..28b5afbb970 100644
--- a/src/mongo/db/s/forwardable_operation_metadata.idl
+++ b/src/mongo/db/s/forwardable_operation_metadata.idl
@@ -45,3 +45,4 @@ structs:
impersonatedUserMetadata:
type: ImpersonatedUserMetadata
description: "A struct representing the impersonated users from a mongos"
+ optional: true
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp
index 50fab719358..9fbf3eceffe 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp
@@ -35,14 +35,83 @@
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_ddl_coordinator_gen.h"
+#include "mongo/logv2/log.h"
#include "mongo/s/grid.h"
namespace mongo {
+ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONObj& coorDoc) {
+ return ShardingDDLCoordinatorMetadata::parse(
+ IDLParserErrorContext("ShardingDDLCoordinatorMetadata"), coorDoc);
+}
+
+ShardingDDLCoordinator::ShardingDDLCoordinator(const BSONObj& coorDoc)
+ : _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)) {}
+
+ShardingDDLCoordinator::~ShardingDDLCoordinator() {
+ invariant(_constructionCompletionPromise.getFuture().isReady());
+}
+
+SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancelationToken& token) noexcept {
+ return ExecutorFuture<void>(**executor)
+ .then([this, executor, token, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ const auto coorName =
+ DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType());
+
+ auto distLockManager = DistLockManager::get(opCtx);
+ auto dbDistLock = uassertStatusOK(distLockManager->lock(
+ opCtx, nss().db(), coorName, DistLockManager::kDefaultLockTimeout));
+ _scopedLocks.emplace(dbDistLock.moveToAnotherThread());
+
+ if (!nss().isConfigDB() && !_coorMetadata.getRecoveredFromDisk()) {
+ invariant(_coorMetadata.getDatabaseVersion());
+
+ OperationShardingState::get(opCtx).initializeClientRoutingVersions(
+ nss(), boost::none /* ChunkVersion */, _coorMetadata.getDatabaseVersion());
+ // Check under the dbLock if this is still the primary shard for the database
+ DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss().db());
+ };
+
+ if (!nss().ns().empty()) {
+ auto collDistLock = uassertStatusOK(distLockManager->lock(
+ opCtx, nss().ns(), coorName, DistLockManager::kDefaultLockTimeout));
+ _scopedLocks.emplace(collDistLock.moveToAnotherThread());
+ }
+
+ _constructionCompletionPromise.emplaceValue();
+ })
+ .onError([this, anchor = shared_from_this()](const Status& status) {
+ LOGV2_ERROR(5390530,
+ "Failed to complete construction of sharding DDL coordinator",
+ "coordinatorId"_attr = _coorMetadata.getId(),
+ "reason"_attr = redact(status));
+ _constructionCompletionPromise.setError(
+ status.withContext("Failed to complete construction of sharding DDL coordinator"));
+ return status;
+ })
+ .then([this, executor, token, anchor = shared_from_this()] {
+ return _runImpl(executor, token);
+ })
+ .onCompletion([this, anchor = shared_from_this()](const Status& status) {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+
+ while (!_scopedLocks.empty()) {
+ _scopedLocks.top().assignNewOpCtx(opCtx);
+ _scopedLocks.pop();
+ }
+ return status;
+ })
+ .semi();
+}
+
ShardingDDLCoordinator_NORESILIENT::ShardingDDLCoordinator_NORESILIENT(OperationContext* opCtx,
const NamespaceString& ns)
- : _nss(ns), _forwardableOpMetadata(opCtx){};
+ : _nss(ns), _forwardableOpMetadata(opCtx) {}
SemiFuture<void> ShardingDDLCoordinator_NORESILIENT::run(OperationContext* opCtx) {
if (!_nss.isConfigDB()) {
@@ -53,20 +122,7 @@ SemiFuture<void> ShardingDDLCoordinator_NORESILIENT::run(OperationContext* opCtx
clientDbVersion);
// Checks that this is the primary shard for the namespace's db
- const auto dbPrimaryShardId = [&]() {
- Lock::DBLock dbWriteLock(opCtx, _nss.db(), MODE_IS);
- auto dss = DatabaseShardingState::get(opCtx, _nss.db());
- auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss);
- // The following call will also ensure that the database version matches
- return dss->getDatabaseInfo(opCtx, dssLock).getPrimary();
- }();
-
- const auto thisShardId = ShardingState::get(opCtx)->shardId();
-
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "This is not the primary shard for db " << _nss.db()
- << " expected: " << dbPrimaryShardId << " shardId: " << thisShardId,
- dbPrimaryShardId == thisShardId);
+ DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, _nss.db());
}
return runImpl(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
}
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h
index 4fd60804dd2..cb0117dd9f8 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator.h
@@ -31,12 +31,68 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/s/dist_lock_manager.h"
#include "mongo/db/s/forwardable_operation_metadata.h"
+#include "mongo/db/s/sharding_ddl_coordinator_gen.h"
#include "mongo/executor/task_executor.h"
#include "mongo/util/future.h"
namespace mongo {
+ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONObj& coorDoc);
+
+class ShardingDDLCoordinator
+ : public repl::PrimaryOnlyService::TypedInstance<ShardingDDLCoordinator> {
+public:
+ explicit ShardingDDLCoordinator(const BSONObj& coorDoc);
+
+ ~ShardingDDLCoordinator();
+
+ /*
+ * Check if the given coordinator document has the same options as this.
+ *
+ * This is used to decide if we can join a previously created coordinator.
+ * In the case the given coordinator document has incompatible options with this,
+ * this function must throw a ConflictingOprationInProgress exception with an adequate message.
+ */
+ virtual void checkIfOptionsConflict(const BSONObj& coorDoc) const = 0;
+
+
+ /*
+ * Returns a future that will be completed when the construction of this coordinator instance
+ * is completed.
+ *
+ * In particular the returned future will be ready only after this coordinator succesfully
+ * aquires the required locks.
+ */
+ SharedSemiFuture<void> getConstructionCompletionFuture() {
+ return _constructionCompletionPromise.getFuture();
+ }
+
+ const NamespaceString& nss() const {
+ return _coorMetadata.getId().getNss();
+ }
+
+ const ForwardableOperationMetadata& getForwardableOpMetadata() const {
+ invariant(_coorMetadata.getForwardableOpMetadata());
+ return _coorMetadata.getForwardableOpMetadata().get();
+ }
+
+protected:
+ ShardingDDLCoordinatorMetadata _coorMetadata;
+ std::stack<DistLockManager::ScopedDistLock> _scopedLocks;
+
+private:
+ SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancelationToken& token) noexcept override final;
+
+ virtual ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancelationToken& token) noexcept = 0;
+
+ SharedPromise<void> _constructionCompletionPromise;
+};
+
class ShardingDDLCoordinator_NORESILIENT {
public:
ShardingDDLCoordinator_NORESILIENT(OperationContext* opCtx, const NamespaceString& nss);
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.idl b/src/mongo/db/s/sharding_ddl_coordinator.idl
new file mode 100644
index 00000000000..6dd43c89303
--- /dev/null
+++ b/src/mongo/db/s/sharding_ddl_coordinator.idl
@@ -0,0 +1,92 @@
+# Copyright (C) 2021-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# This file defines the format of documents stored in config.DDL.Operations.
+
+global:
+ cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/db/s/forwardable_operation_metadata.h"
+ - "mongo/s/database_version.h"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+enums:
+ DDLCoordinatorType:
+ description: "Type of the sharding DDL Operation."
+ type: string
+ values:
+ kDropCollection: "dropCollection"
+
+types:
+ ForwardableOperationMetadata:
+ description: "Forwardable operation metadata assiociated with this coordinator."
+ cpp_type: ForwardableOperationMetadata
+ bson_serialization_type: object
+ serializer: "mongo::ForwardableOperationMetadata::toBSON"
+ deserializer: "mongo::ForwardableOperationMetadata"
+ DatabaseVersion:
+ description: ""
+ cpp_type: DatabaseVersion
+ bson_serialization_type: object
+ serializer: "mongo::DatabaseVersion::toBSON"
+ deserializer: "mongo::DatabaseVersion"
+
+structs:
+ ShardingDDLCoordinatorId:
+ description: "Identifier for a specific sharding DDL Coordinator."
+ generate_comparison_operators: false
+ strict: false
+ fields:
+ namespace:
+ description: "The target namespace of the DDL operation."
+ cpp_name: nss
+ type: namespacestring
+ operationType:
+ description: "Type of the sharding DDL coordinator."
+ type: DDLCoordinatorType
+
+ ShardingDDLCoordinatorMetadata:
+ description: "Commong metadata for all sharding DDL coordinator."
+ generate_comparison_operators: false
+ strict: false
+ fields:
+ _id:
+ type: ShardingDDLCoordinatorId
+ cpp_name: id
+ recoveredFromDisk:
+ type: bool
+ default: false
+ forwardableOpMetadata:
+ type: ForwardableOperationMetadata
+ optional: true
+ databaseVersion:
+ type: DatabaseVersion
+ optional: true
+
diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
new file mode 100644
index 00000000000..f94793638e5
--- /dev/null
+++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
@@ -0,0 +1,113 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/sharding_ddl_coordinator_service.h"
+
+#include "mongo/base/checked_cast.h"
+#include "mongo/db/s/database_sharding_state.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/sharding_ddl_coordinator.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+ShardingDDLCoordinatorService* ShardingDDLCoordinatorService::getService(OperationContext* opCtx) {
+ auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
+ auto service = registry->lookupServiceByName(kServiceName);
+ return checked_cast<ShardingDDLCoordinatorService*>(std::move(service));
+}
+
+std::shared_ptr<ShardingDDLCoordinatorService::Instance>
+ShardingDDLCoordinatorService::constructInstance(BSONObj initialState) const {
+ const auto op = extractShardingDDLCoordinatorMetadata(initialState);
+ LOGV2_DEBUG(5390510,
+ 2,
+ "Constructing new sharding DDL coordinator",
+ "coordinatorDoc"_attr = op.toBSON());
+ switch (op.getId().getOperationType()) {
+ case DDLCoordinatorTypeEnum::kDropCollection:
+ // Switch with only default case are not supported on windows compiler
+ return {};
+ break;
+ default:
+ uasserted(ErrorCodes::BadValue,
+ str::stream()
+ << "Encountered unknown Sharding DDL operation type: "
+ << DDLCoordinatorType_serializer(op.getId().getOperationType()));
+ }
+}
+
+std::shared_ptr<ShardingDDLCoordinatorService::Instance>
+ShardingDDLCoordinatorService::getOrCreateInstance(OperationContext* opCtx, BSONObj coorDoc) {
+ auto coorMetadata = extractShardingDDLCoordinatorMetadata(coorDoc);
+ const auto& nss = coorMetadata.getId().getNss();
+
+ if (!nss.isConfigDB()) {
+ // Check that the operation context has a database version for this namespace
+ const auto clientDbVersion = OperationShardingState::get(opCtx).getDbVersion(nss.db());
+ uassert(ErrorCodes::IllegalOperation,
+ "Request sent without attaching database version",
+ clientDbVersion);
+ DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss.db());
+ coorMetadata.setDatabaseVersion(clientDbVersion);
+ }
+
+ coorMetadata.setForwardableOpMetadata({{opCtx}});
+ const auto patchedCoorDoc = coorDoc.addFields(coorMetadata.toBSON());
+
+ auto [coordinator, created] = [&] {
+ try {
+ auto [coordinator, created] =
+ PrimaryOnlyService::getOrCreateInstance(opCtx, patchedCoorDoc);
+ return std::make_pair(
+ checked_pointer_cast<ShardingDDLCoordinator>(std::move(coordinator)),
+ std::move(created));
+ } catch (const DBException& ex) {
+ LOGV2_ERROR(5390512,
+ "Failed to create instance of sharding DDL coordinator",
+ "coordinatorId"_attr = coorMetadata.getId(),
+ "reason"_attr = redact(ex));
+ throw;
+ }
+ }();
+
+ // If the existing instance doesn't have conflicting options just return that one
+ if (!created) {
+ coordinator->checkIfOptionsConflict(coorDoc);
+ }
+
+ coordinator->getConstructionCompletionFuture().get(opCtx);
+ return coordinator;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.h b/src/mongo/db/s/sharding_ddl_coordinator_service.h
new file mode 100644
index 00000000000..c514ca659c9
--- /dev/null
+++ b/src/mongo/db/s/sharding_ddl_coordinator_service.h
@@ -0,0 +1,65 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/primary_only_service.h"
+
+namespace mongo {
+
+class ShardingDDLCoordinatorService final : public repl::PrimaryOnlyService {
+public:
+ static constexpr StringData kServiceName = "ShardingDDLCoordinator"_sd;
+
+ explicit ShardingDDLCoordinatorService(ServiceContext* serviceContext)
+ : PrimaryOnlyService(serviceContext) {}
+
+ ~ShardingDDLCoordinatorService() = default;
+
+ static ShardingDDLCoordinatorService* getService(OperationContext* opCtx);
+
+ StringData getServiceName() const override {
+ return kServiceName;
+ }
+
+ NamespaceString getStateDocumentsNS() const override {
+ return NamespaceString::kShardingDDLCoordinatorsNamespace;
+ }
+
+ ThreadPool::Limits getThreadPoolLimits() const override {
+ return ThreadPool::Limits();
+ }
+
+ std::shared_ptr<Instance> constructInstance(BSONObj initialState) const override;
+
+ std::shared_ptr<Instance> getOrCreateInstance(OperationContext* opCtx, BSONObj initialState);
+};
+
+} // namespace mongo