summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp449
-rw-r--r--src/mongo/db/s/create_collection_coordinator.h60
-rw-r--r--src/mongo/db/s/create_collection_coordinator_document.idl97
-rw-r--r--src/mongo/db/s/shard_key_util.cpp26
-rw-r--r--src/mongo/db/s/shard_key_util.h15
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.cpp3
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.h1
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.idl1
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator_service.cpp3
-rw-r--r--src/mongo/db/s/shardsvr_create_collection_command.cpp27
11 files changed, 542 insertions, 141 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 1595ea721e2..6667851b1d0 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -334,6 +334,7 @@ env.Library(
'config/configsvr_split_chunk_command.cpp',
'config/configsvr_update_zone_key_range_command.cpp',
'create_collection_coordinator.cpp',
+ 'create_collection_coordinator_document.idl',
'drop_collection_coordinator.cpp',
'drop_collection_coordinator_document.idl',
'drop_database_coordinator.cpp',
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index a98d17eca81..16629312351 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/feature_compatibility_version.h"
+#include "mongo/db/persistent_task_store.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/create_collection_coordinator.h"
@@ -245,6 +246,25 @@ BSONObj getCollation(OperationContext* opCtx,
return actualCollatorBSON;
}
+void removeChunks(OperationContext* opCtx, const UUID& uuid) {
+ BatchWriteExecStats stats;
+ BatchedCommandResponse response;
+ BatchedCommandRequest deleteRequest([&]() {
+ write_ops::Delete deleteOp(ChunkType::ConfigNS);
+ deleteOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ deleteOp.setDeletes(
+ std::vector{write_ops::DeleteOpEntry(BSON(ChunkType::collectionUUID << uuid), false)});
+ return deleteOp;
+ }());
+ deleteRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ cluster::write(opCtx, deleteRequest, &stats, &response);
+ uassertStatusOK(response.toStatus());
+}
+
void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) {
BatchWriteExecStats stats;
BatchedCommandResponse response;
@@ -303,103 +323,264 @@ void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, Col
}
}
+void removeShardIndex(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& keyPattern) {
+ DBDirectClient localClient(opCtx);
+
+ localClient.dropIndex(nss.ns(), keyPattern, WriteConcernOptions::Majority);
+}
+
+void broadcastDropCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::shared_ptr<executor::TaskExecutor>& executor) {
+ const auto primaryShardId = ShardingState::get(opCtx)->shardId();
+ const ShardsvrDropCollectionParticipant dropCollectionParticipant(nss);
+
+ auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
+ // Remove prymary shard from participants
+ participants.erase(std::remove(participants.begin(), participants.end(), primaryShardId),
+ participants.end());
+
+ sharding_ddl_util::sendAuthenticatedCommandToShards(
+ opCtx,
+ nss.db(),
+ CommandHelpers::appendMajorityWriteConcern(dropCollectionParticipant.toBSON({})),
+ participants,
+ executor);
+}
+
} // namespace
-CreateCollectionCoordinator::CreateCollectionCoordinator(
- OperationContext* opCtx, const ShardsvrCreateCollection& createCollParams)
- : ShardingDDLCoordinator_NORESILIENT(opCtx, createCollParams.getNamespace()),
- _serviceContext(opCtx->getServiceContext()),
- _request(createCollParams),
- _nss(_request.getNamespace()) {
- invariant(createCollParams.getShardKey());
- _shardKeyPattern = ShardKeyPattern(createCollParams.getShardKey()->getOwned());
+CreateCollectionCoordinator::CreateCollectionCoordinator(const BSONObj& initialState)
+ : ShardingDDLCoordinator(initialState),
+ _doc(CreateCollectionCoordinatorDocument::parse(
+ IDLParserErrorContext("CreateCollectionCoordinatorDocument"), initialState)) {}
+
+boost::optional<BSONObj> CreateCollectionCoordinator::reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode connMode,
+ MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept {
+ // TODO (SERVER-55485): Add request parameters.
+ BSONObjBuilder cmdBob;
+ if (const auto& optComment = getForwardableOpMetadata().getComment()) {
+ cmdBob.append(optComment.get().firstElement());
+ }
+ BSONObjBuilder bob;
+ bob.append("type", "op");
+ bob.append("desc", "CreateCollectionCoordinator");
+ bob.append("op", "command");
+ bob.append("ns", nss().toString());
+ bob.append("command", cmdBob.obj());
+ bob.append("currentPhase", _doc.getPhase());
+ bob.append("active", true);
+ return bob.obj();
}
-SemiFuture<void> CreateCollectionCoordinator::runImpl(
- std::shared_ptr<executor::TaskExecutor> executor) {
- return ExecutorFuture<void>(executor, Status::OK())
- .then([this, anchor = shared_from_this()]() {
- ThreadClient tc("CreateCollectionCoordinator", _serviceContext);
- auto opCtxHolder = tc->makeOperationContext();
- auto* opCtx = opCtxHolder.get();
- _forwardableOpMetadata.setOn(opCtx);
-
- _checkCommandArguments(opCtx);
- if (_result) {
- // Early return before holding the critical section, the collection was already
- // created.
- return;
- }
- {
- // From this point on all writes are blocked on the collection.
- ScopedShardVersionCriticalSection critSec(opCtx, _nss);
- _createCollectionAndIndexes(opCtx);
- if (_result) {
- // Early return, the collection was already created.
+void CreateCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const {
+ const auto errorMessage = "Another operation with different arguments is already running";
+ auto opUnique = _doc.getUnique().is_initialized();
+ auto opCollation = _doc.getCollation().is_initialized();
+ auto opNumInitialChunks = _doc.getNumInitialChunks().is_initialized();
+ auto opPresplit = _doc.getPresplitHashedZones().is_initialized();
+ auto opSplitPoints = _doc.getInitialSplitPoints().is_initialized();
+
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ errorMessage,
+ opUnique == doc.hasField(CoordDoc::kUniqueFieldName) &&
+ opCollation == doc.hasField(CoordDoc::kCollationFieldName) &&
+ opNumInitialChunks == doc.hasField(CoordDoc::kNumInitialChunksFieldName) &&
+ opPresplit == doc.hasField(CoordDoc::kPresplitHashedZonesFieldName) &&
+ opSplitPoints == doc.hasField(CoordDoc::kInitialSplitPointsFieldName));
+
+ bool sameArguments;
+ const auto& shardKey = doc.getObjectField(CoordDoc::kShardKeyFieldName);
+ sameArguments = SimpleBSONObjComparator::kInstance.evaluate(shardKey == *_doc.getShardKey());
+
+ if (sameArguments && opUnique) {
+ sameArguments = doc.getBoolField(CoordDoc::kUniqueFieldName) == *_doc.getUnique();
+ }
+
+ if (sameArguments && opPresplit) {
+ sameArguments = doc.getBoolField(CoordDoc::kPresplitHashedZonesFieldName) ==
+ *_doc.getPresplitHashedZones();
+ }
+
+ if (sameArguments && opNumInitialChunks) {
+ sameArguments =
+ doc.getIntField(CoordDoc::kNumInitialChunksFieldName) == *_doc.getNumInitialChunks();
+ }
+
+ if (sameArguments && opCollation) {
+ const auto& collation = doc.getObjectField(CoordDoc::kCollationFieldName);
+ sameArguments =
+ SimpleBSONObjComparator::kInstance.evaluate(collation == *_doc.getCollation());
+ }
+
+ if (sameArguments && opSplitPoints) {
+ const auto& initialSplitPoints = doc.getObjectField(CoordDoc::kInitialSplitPointsFieldName);
+ BSONArrayBuilder builder;
+ builder.append(*_doc.getInitialSplitPoints());
+ const auto& currentSplitPoints = builder.obj();
+ sameArguments =
+ SimpleBSONObjComparator::kInstance.evaluate(initialSplitPoints == currentSplitPoints);
+ }
+
+ // If we have two shard collections on the same namespace, then the arguments must be the same.
+ uassert(ErrorCodes::ConflictingOperationInProgress, errorMessage, sameArguments);
+}
+
+ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept {
+ return ExecutorFuture<void>(**executor)
+ .then(_executePhase(Phase::kCheck,
+ [this, anchor = shared_from_this()] {
+ _shardKeyPattern = ShardKeyPattern(*_doc.getShardKey());
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ _checkCommandArguments(opCtx);
+
+ // Checks if the shard key index is already created and can be used
+ // to shard the collection, so if there is a rollback needed, the
+ // shard key should not be dropped because it was part of the
+ // collection originally before sharding the collection.
+ _doc.setShardKeyAlreadyCreated(
+ shardkeyutil::validShardKeyIndexExists(
+ opCtx,
+ nss(),
+ *_shardKeyPattern,
+ _collation,
+ _doc.getUnique().get_value_or(false),
+ shardkeyutil::ValidationBehaviorsShardCollection(opCtx)));
+ }))
+ .then(_executePhase(
+ Phase::kCommit,
+ [this, executor = executor, anchor = shared_from_this()] {
+ if (!_shardKeyPattern) {
+ _shardKeyPattern = ShardKeyPattern(*_doc.getShardKey());
+ }
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ if (auto createCollectionResponseOpt =
+ sharding_ddl_util::checkIfCollectionAlreadySharded(
+ opCtx,
+ nss(),
+ _shardKeyPattern->getKeyPattern().toBSON(),
+ getCollation(opCtx, nss(), _doc.getCollation()),
+ _doc.getUnique().value_or(false))) {
+ _result = createCollectionResponseOpt;
+ // Early return before holding the critical section, the
+ // collection was already created and commited but there was a
+ // stepdown before removing the coordinator document.
return;
}
- _createChunks(opCtx);
- if (_splitPolicy->isOptimized()) {
- // Block reads/writes from here on if we need to create the collection on other
- // shards, this way we prevent reads/writes that should be redirected to another
- // shard.
- critSec.enterCommitPhase();
- _createCollectionOnNonPrimaryShards(opCtx);
+ {
+ // Entering the critical section. From this point on, the writes
+ // are blocked.
+ ScopedShardVersionCriticalSection critSec(opCtx, nss());
+
+ if (_recoveredFromDisk) {
+ LOGV2_DEBUG(5458704,
+ 1,
+ "Removing partial changes from previous run",
+ "namespace"_attr = nss(),
+ "dropShardKey"_attr = _doc.getShardKeyAlreadyCreated());
+ if (!_doc.getShardKeyAlreadyCreated()) {
+ // TODO SERVER-55551: remove this to prevent continous failover on huge
+ // collections.
+ removeShardIndex(opCtx, nss(), _shardKeyPattern->toBSON());
+ }
+ removeChunks(opCtx, *getUUIDFromPrimaryShard(opCtx, nss()));
+ broadcastDropCollection(opCtx, nss(), **executor);
+ }
+
+ _createCollectionAndIndexes(opCtx);
+
+ _createPolicyAndChunks(opCtx);
+
+ if (_splitPolicy->isOptimized()) {
+ // Block reads/writes from here on if we need to create
+ // the collection on other shards, this way we prevent
+ // reads/writes that should be redirected to another
+ // shard.
+ critSec.enterCommitPhase();
+ _createCollectionOnNonPrimaryShards(opCtx);
+
+ _commit(opCtx);
+ }
+ }
+ if (!_splitPolicy->isOptimized()) {
_commit(opCtx);
}
- }
- if (!_splitPolicy->isOptimized()) {
- _commit(opCtx);
+ _finalize(opCtx);
+ }))
+ .onCompletion([this, anchor = shared_from_this()](const Status& status) {
+ try {
+ _removeCoordinatorDocument();
+ uassertStatusOK(status);
+ LOGV2(5458702, "Collection created", "namespace"_attr = nss());
+ } catch (const ExceptionForCat<ErrorCategory::NotPrimaryError>& ex) {
+ LOGV2_ERROR(5458701,
+ "Create collection interrupted with a NotPrimaryError exception "
+ "category, it will continue on the next primary",
+ "namespace"_attr = nss(),
+ "error"_attr = redact(ex));
+ } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& ex) {
+ LOGV2_ERROR(5458707,
+ "Create collection interrupted with a ShutdownError exception "
+ "category, it will continue on the next primary",
+ "namespace"_attr = nss(),
+ "error"_attr = redact(ex));
+ } catch (const DBException& ex) {
+ LOGV2_ERROR(5458703,
+ "Error running create collection",
+ "namespace"_attr = nss(),
+ "error"_attr = redact(ex));
+ // TODO SERVER-55396: retry operation until it succeeds.
+ throw;
}
-
- _cleanup(opCtx);
- })
- .onError([this, anchor = shared_from_this()](const Status& status) {
- LOGV2_ERROR(5277908,
- "Error running create collection",
- "namespace"_attr = _nss,
- "error"_attr = redact(status));
- return status;
- })
- .semi();
+ });
}
void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) {
- LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = _nss);
+ LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = nss());
- const auto dbInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabaseWithRefresh(opCtx, _nss.db()));
+ const auto dbInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getDatabaseWithRefresh(opCtx, nss().db()));
uassert(ErrorCodes::IllegalOperation,
- str::stream() << "sharding not enabled for db " << _nss.db(),
+ str::stream() << "sharding not enabled for db " << nss().db(),
dbInfo.shardingEnabled());
- if (_nss.db() == NamespaceString::kConfigDb) {
+ if (nss().db() == NamespaceString::kConfigDb) {
// Only whitelisted collections in config may be sharded (unless we are in test mode)
uassert(ErrorCodes::IllegalOperation,
"only special collections in the config db may be sharded",
- _nss == NamespaceString::kLogicalSessionsNamespace);
+ nss() == NamespaceString::kLogicalSessionsNamespace);
}
// Ensure that hashed and unique are not both set.
uassert(ErrorCodes::InvalidOptions,
"Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on "
"the hashed field by declaring an additional (non-hashed) unique index on the field.",
- !_shardKeyPattern.value().isHashedPattern() ||
- !(_request.getUnique() && _request.getUnique().value()));
+ !_shardKeyPattern->isHashedPattern() || !_doc.getUnique().value_or(false));
// Ensure the namespace is valid.
uassert(ErrorCodes::IllegalOperation,
"can't shard system namespaces",
- !_nss.isSystem() || _nss == NamespaceString::kLogicalSessionsNamespace ||
- _nss.isTemporaryReshardingCollection() || _nss.isTimeseriesBucketsCollection());
+ !nss().isSystem() || nss() == NamespaceString::kLogicalSessionsNamespace ||
+ nss().isTemporaryReshardingCollection() || nss().isTimeseriesBucketsCollection());
- if (_request.getNumInitialChunks()) {
+ if (_doc.getNumInitialChunks()) {
// Ensure numInitialChunks is within valid bounds.
// Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000
// chunks in total to limit the amount of memory this command consumes so there is less
@@ -408,7 +589,7 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx
const int maxNumInitialChunksForShards =
Grid::get(opCtx)->shardRegistry()->getNumShardsNoReload() * 8192;
const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption
- int numChunks = _request.getNumInitialChunks().value();
+ int numChunks = _doc.getNumInitialChunks().value();
uassert(ErrorCodes::InvalidOptions,
str::stream() << "numInitialChunks cannot be more than either: "
<< maxNumInitialChunksForShards << ", 8192 * number of shards; or "
@@ -417,14 +598,14 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx
numChunks <= maxNumInitialChunksTotal);
}
- if (_nss.db() == NamespaceString::kConfigDb) {
+ if (nss().db() == NamespaceString::kConfigDb) {
auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findReponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kMajorityReadConcern,
- _nss,
+ nss(),
BSONObj(),
BSONObj(),
1));
@@ -436,60 +617,55 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx
"collections in the config db must be empty to be sharded",
numDocs == 0);
}
-
- auto unique = _request.getUnique() ? *_request.getUnique() : false;
- if (auto createCollectionResponseOpt =
- mongo::sharding_ddl_util::checkIfCollectionAlreadySharded(
- opCtx,
- _nss,
- _shardKeyPattern->getKeyPattern().toBSON(),
- getCollation(opCtx, _nss, _request.getCollation()),
- unique)) {
- _result = createCollectionResponseOpt;
- }
}
void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* opCtx) {
LOGV2_DEBUG(
- 5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = _nss);
+ 5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = nss());
- auto unique = _request.getUnique() ? *_request.getUnique() : false;
- _collation = getCollation(opCtx, _nss, _request.getCollation());
+ auto unique = _doc.getUnique().value_or(false);
+ _collation = getCollation(opCtx, nss(), _doc.getCollation());
if (auto createCollectionResponseOpt =
mongo::sharding_ddl_util::checkIfCollectionAlreadySharded(
- opCtx, _nss, _shardKeyPattern->getKeyPattern().toBSON(), *_collation, unique)) {
+ opCtx, nss(), _shardKeyPattern->getKeyPattern().toBSON(), *_collation, unique)) {
_result = createCollectionResponseOpt;
return;
}
- // Internally creates the collection if it doesn't exist.
shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible(
opCtx,
- _nss,
+ nss(),
*_shardKeyPattern,
_collation,
- _request.getUnique() ? *_request.getUnique() : false,
+ _doc.getUnique().value_or(false),
shardkeyutil::ValidationBehaviorsShardCollection(opCtx));
- _collectionUUID = *getUUIDFromPrimaryShard(opCtx, _nss);
+ // Wait until the index is majority written, to prevent having the collection commited to the
+ // config server, but the index creation rolled backed on stepdowns.
+ WriteConcernResult ignoreResult;
+ auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ uassertStatusOK(waitForWriteConcern(
+ opCtx, latestOpTime, ShardingCatalogClient::kMajorityWriteConcern, &ignoreResult));
+
+ _collectionUUID = *getUUIDFromPrimaryShard(opCtx, nss());
}
-void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) {
- LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = _nss);
+void CreateCollectionCoordinator::_createPolicyAndChunks(OperationContext* opCtx) {
+ LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = nss());
_splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy(
opCtx,
*_shardKeyPattern,
- _request.getNumInitialChunks() ? *_request.getNumInitialChunks() : 0,
- _request.getPresplitHashedZones() ? *_request.getPresplitHashedZones() : false,
- _request.getInitialSplitPoints(),
- getTagsAndValidate(opCtx, _nss, _shardKeyPattern->toBSON(), *_shardKeyPattern),
+ _doc.getNumInitialChunks() ? *_doc.getNumInitialChunks() : 0,
+ _doc.getPresplitHashedZones() ? *_doc.getPresplitHashedZones() : false,
+ _doc.getInitialSplitPoints(),
+ getTagsAndValidate(opCtx, nss(), _shardKeyPattern->toBSON(), *_shardKeyPattern),
getNumShards(opCtx),
- checkIfCollectionIsEmpty(opCtx, _nss));
+ checkIfCollectionIsEmpty(opCtx, nss()));
_initialChunks = _splitPolicy->createFirstChunks(
- opCtx, *_shardKeyPattern, {_nss, *_collectionUUID, ShardingState::get(opCtx)->shardId()});
+ opCtx, *_shardKeyPattern, {nss(), *_collectionUUID, ShardingState::get(opCtx)->shardId()});
// There must be at least one chunk.
invariant(!_initialChunks.chunks.empty());
@@ -499,13 +675,13 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC
LOGV2_DEBUG(5277905,
2,
"Create collection _createCollectionOnNonPrimaryShards",
- "namespace"_attr = _nss);
+ "namespace"_attr = nss());
std::vector<AsyncRequestsSender::Request> requests;
std::set<ShardId> initializedShards;
auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId();
- NamespaceStringOrUUID nssOrUUID{_nss.db().toString(), *_collectionUUID};
+ NamespaceStringOrUUID nssOrUUID{nss().db().toString(), *_collectionUUID};
auto [collOptions, indexes, idIndex] = getCollectionOptionsAndIndexes(opCtx, nssOrUUID);
for (const auto& chunk : _initialChunks.chunks) {
@@ -515,7 +691,7 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC
continue;
}
- ShardsvrCreateCollectionParticipant createCollectionParticipantRequest(_nss);
+ ShardsvrCreateCollectionParticipant createCollectionParticipantRequest(nss());
createCollectionParticipantRequest.setCollectionUUID(*_collectionUUID);
createCollectionParticipantRequest.setOptions(collOptions);
@@ -532,7 +708,7 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC
if (!requests.empty()) {
auto responses = gatherResponses(opCtx,
- _nss.db(),
+ nss().db(),
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
Shard::RetryPolicy::kIdempotent,
requests);
@@ -542,57 +718,57 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC
for (const auto& response : responses) {
auto shardResponse = uassertStatusOKWithContext(
std::move(response.swResponse),
- str::stream() << "Unable to create collection " << _nss.ns() << " on "
+ str::stream() << "Unable to create collection " << nss().ns() << " on "
<< response.shardId);
auto status = getStatusFromCommandResult(shardResponse.data);
uassertStatusOK(status.withContext(str::stream()
- << "Unable to create collection " << _nss.ns()
+ << "Unable to create collection " << nss().ns()
<< " on " << response.shardId));
auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data);
uassertStatusOK(wcStatus.withContext(str::stream()
- << "Unable to create collection " << _nss.ns()
+ << "Unable to create collection " << nss().ns()
<< " on " << response.shardId));
}
}
}
void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
- LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = _nss);
+ LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = nss());
// Upsert Chunks.
upsertChunks(opCtx, _initialChunks.chunks);
- CollectionType coll(_nss,
+ CollectionType coll(nss(),
_initialChunks.collVersion().epoch(),
_initialChunks.creationTime,
Date_t::now(),
*_collectionUUID);
- coll.setKeyPattern(_shardKeyPattern->toBSON());
+ coll.setKeyPattern(_shardKeyPattern->getKeyPattern());
if (_collation) {
coll.setDefaultCollation(_collation.value());
}
- if (_request.getUnique()) {
- coll.setUnique(*_request.getUnique());
+ if (_doc.getUnique()) {
+ coll.setUnique(*_doc.getUnique());
}
- updateCatalogEntry(opCtx, _nss, coll);
+ updateCatalogEntry(opCtx, nss(), coll);
}
-void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) {
- LOGV2_DEBUG(5277907, 2, "Create collection _cleanup", "namespace"_attr = _nss);
+void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) noexcept {
+ LOGV2_DEBUG(5277907, 2, "Create collection _finalize", "namespace"_attr = nss());
try {
- forceShardFilteringMetadataRefresh(opCtx, _nss);
+ forceShardFilteringMetadataRefresh(opCtx, nss());
} catch (const DBException&) {
// If the refresh fails, then set the shard version to UNKNOWN and let a future operation to
// refresh the metadata.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
- CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx);
+ AutoGetCollection autoColl(opCtx, nss(), MODE_IX);
+ CollectionShardingRuntime::get(opCtx, nss())->clearFilteringMetadata(opCtx);
}
// Is it really necessary to refresh all shards? or can I assume that the shard version will be
@@ -614,7 +790,7 @@ void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) {
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
- BSON("_flushRoutingTableCacheUpdates" << _nss.ns()),
+ BSON("_flushRoutingTableCacheUpdates" << nss().ns()),
Seconds{30},
Shard::RetryPolicy::kIdempotent));
@@ -630,7 +806,7 @@ void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) {
LOGV2(5277901,
"Created initial chunk(s)",
- "namespace"_attr = _nss,
+ "namespace"_attr = nss(),
"numInitialChunks"_attr = _initialChunks.chunks.size(),
"initialCollectionVersion"_attr = _initialChunks.collVersion());
@@ -638,7 +814,7 @@ void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) {
ShardingLogging::get(opCtx)->logChange(
opCtx,
"shardCollection.end",
- _nss.ns(),
+ nss().ns(),
BSON("version" << _initialChunks.collVersion().toString() << "numChunks"
<< static_cast<int>(_initialChunks.chunks.size())),
ShardingCatalogClient::kMajorityWriteConcern);
@@ -649,4 +825,53 @@ void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) {
_result = result;
}
+// Phase change and document handling API.
+void CreateCollectionCoordinator::_insertCoordinatorDocument(CoordDoc&& doc) {
+ auto docBSON = _doc.toBSON();
+ auto coorMetadata = doc.getShardingDDLCoordinatorMetadata();
+ coorMetadata.setRecoveredFromDisk(true);
+ doc.setShardingDDLCoordinatorMetadata(coorMetadata);
+
+ auto opCtx = cc().makeOperationContext();
+ PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
+ store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern);
+ _doc = std::move(doc);
+}
+
+void CreateCollectionCoordinator::_updateCoordinatorDocument(CoordDoc&& newDoc) {
+ auto opCtx = cc().makeOperationContext();
+ PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
+ store.update(opCtx.get(),
+ BSON(CoordDoc::kIdFieldName << _doc.getId().toBSON()),
+ newDoc.toBSON(),
+ WriteConcerns::kMajorityWriteConcern);
+
+ _doc = std::move(newDoc);
+}
+
+void CreateCollectionCoordinator::_removeCoordinatorDocument() {
+ auto opCtx = cc().makeOperationContext();
+ PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
+ LOGV2_DEBUG(5458705,
+ 1,
+ "Removing state document for create collection coordinator",
+ "namespace"_attr = nss());
+ store.remove(opCtx.get(),
+ BSON(CoordDoc::kIdFieldName << _doc.getId().toBSON()),
+ WriteConcerns::kMajorityWriteConcern);
+
+ _doc = {};
+}
+
+void CreateCollectionCoordinator::_enterPhase(Phase newPhase) {
+ CoordDoc newDoc(_doc);
+ newDoc.setPhase(newPhase);
+
+ if (_doc.getPhase() == Phase::kUnset) {
+ _insertCoordinatorDocument(std::move(newDoc));
+ return;
+ }
+ _updateCoordinatorDocument(std::move(newDoc));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h
index a9177f276ee..d69b43252fb 100644
--- a/src/mongo/db/s/create_collection_coordinator.h
+++ b/src/mongo/db/s/create_collection_coordinator.h
@@ -31,6 +31,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/s/config/initial_split_policy.h"
+#include "mongo/db/s/create_collection_coordinator_document_gen.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_ddl_coordinator.h"
#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
@@ -38,22 +39,56 @@
namespace mongo {
-class CreateCollectionCoordinator final
- : public ShardingDDLCoordinator_NORESILIENT,
- public std::enable_shared_from_this<CreateCollectionCoordinator> {
+class CreateCollectionCoordinator final : public ShardingDDLCoordinator {
public:
- CreateCollectionCoordinator(OperationContext* opCtx, const ShardsvrCreateCollection& request);
+ using CoordDoc = CreateCollectionCoordinatorDocument;
+ using Phase = CreateCollectionCoordinatorPhaseEnum;
+
+ CreateCollectionCoordinator(const BSONObj& initialState);
+ ~CreateCollectionCoordinator() = default;
+
+
+ void checkIfOptionsConflict(const BSONObj& coorDoc) const override;
+
+ boost::optional<BSONObj> reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode connMode,
+ MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override;
/**
- * Returns the information of the newly created collection, or the already existing one. It must
- * be called after a successfull execution of run.
+ * Waits for the termination of the parent DDLCoordinator (so all the resources are liberated)
+ * and then return the
*/
- const CreateCollectionResponse& getResultOnSuccess() {
+ CreateCollectionResponse getResult(OperationContext* opCtx) {
+ getCompletionFuture().get(opCtx);
+ invariant(_result.is_initialized());
return *_result;
}
private:
- SemiFuture<void> runImpl(std::shared_ptr<executor::TaskExecutor> executor) override;
+ ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept override;
+
+ template <typename Func>
+ auto _executePhase(const Phase& newPhase, Func&& func) {
+ return [=] {
+ const auto& currPhase = _doc.getPhase();
+
+ if (currPhase > newPhase) {
+ // Do not execute this phase if we already reached a subsequent one.
+ return;
+ }
+ if (currPhase < newPhase) {
+ // Persist the new phase if this is the first time we are executing it.
+ _enterPhase(newPhase);
+ }
+ return func();
+ };
+ };
+
+ void _insertCoordinatorDocument(CoordDoc&& doc);
+ void _updateCoordinatorDocument(CoordDoc&& newStateDoc);
+ void _removeCoordinatorDocument();
+ void _enterPhase(Phase newState);
/**
* Performs all required checks before holding the critical sections.
@@ -68,7 +103,7 @@ private:
/**
* Given the appropiate split policy, create the initial chunks.
*/
- void _createChunks(OperationContext* opCtx);
+ void _createPolicyAndChunks(OperationContext* opCtx);
/**
* If the optimized path can be taken, ensure the collection is already created in all the
@@ -86,12 +121,11 @@ private:
/**
* Refresh all participant shards and log creation.
*/
- void _cleanup(OperationContext* opCtx);
+ void _finalize(OperationContext* opCtx) noexcept;
- ServiceContext* _serviceContext;
- const ShardsvrCreateCollection _request;
- const NamespaceString& _nss;
+ CreateCollectionCoordinatorDocument _doc;
+ // Objects generated on each execution.
boost::optional<ShardKeyPattern> _shardKeyPattern;
boost::optional<BSONObj> _collation;
boost::optional<UUID> _collectionUUID;
diff --git a/src/mongo/db/s/create_collection_coordinator_document.idl b/src/mongo/db/s/create_collection_coordinator_document.idl
new file mode 100644
index 00000000000..c8c9aa05762
--- /dev/null
+++ b/src/mongo/db/s/create_collection_coordinator_document.idl
@@ -0,0 +1,97 @@
+# Copyright (C) 2021-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# This file defines the format of documents stored in config.ddl.dropCollections, used by the
+# shard coordinator to guarantee resilience in the event of stepdowns while creating collections.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+ - "mongo/db/s/sharding_ddl_coordinator.idl"
+ - "mongo/s/request_types/sharded_ddl_commands.idl"
+
+enums:
+ CreateCollectionCoordinatorPhase:
+ description: "Current create collection coordinator's operation state."
+ type: string
+ values:
+ kUnset: "unset"
+ kCheck: "check"
+ kCommit: "commit"
+
+structs:
+ CreateCollectionCoordinatorDocument:
+ description: "Object with neccessary fields to create a collection"
+ generate_comparison_operators: false
+ strict: true
+ chained_structs:
+ ShardingDDLCoordinatorMetadata: ShardingDDLCoordinatorMetadata
+ fields:
+ phase:
+ type: CreateCollectionCoordinatorPhase
+ description: "Coordinator phase."
+ default: kUnset
+ shardKeyAlreadyCreated:
+ type: bool
+ description: >-
+ If set then the collection already had the shard key index created when the
+ command was first invoked. If a step down happens, then the shard index will
+ not be dropped on the next attempt at startup.
+ default: false
+ shardKey:
+ type: object_owned
+ description: "The index specification document to use as the shard key."
+ optional: true
+ unique:
+ type: bool
+ description: "Whether the shard key index should enforce a unique constraint."
+ optional: true
+ numInitialChunks:
+ type: safeInt64
+ description: >-
+ The number of chunks to create initially when sharding an empty collection with
+ a hashed shard key.
+ optional: true
+ presplitHashedZones:
+ type: bool
+ description: >-
+ True if the chunks should be pre-split based on the existing zones when
+ sharding a collection with hashed shard key.
+ optional: true
+ initialSplitPoints:
+ type: array<object_owned>
+ description: >-
+ A specific set of points to create initial splits at, currently used only by
+ mapReduce.
+ optional: true
+ collation:
+ type: object_owned
+ description: "The collation to use for the shard key index."
+ optional: true \ No newline at end of file
diff --git a/src/mongo/db/s/shard_key_util.cpp b/src/mongo/db/s/shard_key_util.cpp
index cb11f76831a..018164565ba 100644
--- a/src/mongo/db/s/shard_key_util.cpp
+++ b/src/mongo/db/s/shard_key_util.cpp
@@ -100,12 +100,12 @@ BSONObj makeCreateIndexesCmd(const NamespaceString& nss,
} // namespace
-void validateShardKeyIndexExistsOrCreateIfPossible(OperationContext* opCtx,
- const NamespaceString& nss,
- const ShardKeyPattern& shardKeyPattern,
- const boost::optional<BSONObj>& defaultCollation,
- bool unique,
- const ShardKeyValidationBehaviors& behaviors) {
+bool validShardKeyIndexExists(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardKeyPattern& shardKeyPattern,
+ const boost::optional<BSONObj>& defaultCollation,
+ bool unique,
+ const ShardKeyValidationBehaviors& behaviors) {
auto indexes = behaviors.loadIndexes(nss);
// 1. Verify consistency with existing unique indexes
@@ -174,9 +174,23 @@ void validateShardKeyIndexExistsOrCreateIfPossible(OperationContext* opCtx,
if (hasUsefulIndexForKey) {
// Check 2.iii Make sure that there is a useful, non-multikey index available.
behaviors.verifyUsefulNonMultiKeyIndex(nss, shardKeyPattern.toBSON());
+ }
+
+ return hasUsefulIndexForKey;
+}
+
+void validateShardKeyIndexExistsOrCreateIfPossible(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardKeyPattern& shardKeyPattern,
+ const boost::optional<BSONObj>& defaultCollation,
+ bool unique,
+ const ShardKeyValidationBehaviors& behaviors) {
+ if (validShardKeyIndexExists(
+ opCtx, nss, shardKeyPattern, defaultCollation, unique, behaviors)) {
return;
}
+
// 4. If no useful index, verify we can create one.
behaviors.verifyCanCreateShardKeyIndex(nss);
diff --git a/src/mongo/db/s/shard_key_util.h b/src/mongo/db/s/shard_key_util.h
index 619ced60915..6d7a41db58e 100644
--- a/src/mongo/db/s/shard_key_util.h
+++ b/src/mongo/db/s/shard_key_util.h
@@ -149,6 +149,19 @@ void validateShardKeyIndexExistsOrCreateIfPossible(OperationContext* opCtx,
const boost::optional<BSONObj>& defaultCollation,
bool unique,
const ShardKeyValidationBehaviors& behaviors);
-
+/**
+ * Compares the proposed shard key with the collection's existing indexes to ensure they are a legal
+ * combination.
+ *
+ * Returns true if the shard key is valid and already exists. Steps 1, 2 and 3 of the previous
+ * function.
+ *
+ */
+bool validShardKeyIndexExists(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardKeyPattern& shardKeyPattern,
+ const boost::optional<BSONObj>& defaultCollation,
+ bool unique,
+ const ShardKeyValidationBehaviors& behaviors);
} // namespace shardkeyutil
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp
index 02d006aea47..4d18a4c836f 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp
@@ -47,7 +47,8 @@ ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONO
}
ShardingDDLCoordinator::ShardingDDLCoordinator(const BSONObj& coorDoc)
- : _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)) {}
+ : _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)),
+ _recoveredFromDisk(_coorMetadata.getRecoveredFromDisk()) {}
ShardingDDLCoordinator::~ShardingDDLCoordinator() {
invariant(_constructionCompletionPromise.getFuture().isReady());
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h
index d6e040ab33c..3ec6e42aab0 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator.h
@@ -96,6 +96,7 @@ protected:
};
ShardingDDLCoordinatorMetadata _coorMetadata;
+ bool _recoveredFromDisk;
private:
SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.idl b/src/mongo/db/s/sharding_ddl_coordinator.idl
index e8c602ed370..3a28d9f2e67 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.idl
+++ b/src/mongo/db/s/sharding_ddl_coordinator.idl
@@ -45,6 +45,7 @@ enums:
kDropDatabase: "dropDatabase"
kDropCollection: "dropCollection"
kRenameCollection: "renameCollection"
+ kCreateCollection: "createCollection"
types:
ForwardableOperationMetadata:
diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
index b7ec805aeb9..259e0db4a4e 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/s/sharding_ddl_coordinator_service.h"
#include "mongo/base/checked_cast.h"
+#include "mongo/db/s/create_collection_coordinator.h"
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_ddl_coordinator.h"
@@ -67,6 +68,8 @@ ShardingDDLCoordinatorService::constructInstance(BSONObj initialState) const {
break;
case DDLCoordinatorTypeEnum::kRenameCollection:
return std::make_shared<RenameCollectionCoordinator>(std::move(initialState));
+ case DDLCoordinatorTypeEnum::kCreateCollection:
+ return std::make_shared<CreateCollectionCoordinator>(std::move(initialState));
break;
default:
uasserted(ErrorCodes::BadValue,
diff --git a/src/mongo/db/s/shardsvr_create_collection_command.cpp b/src/mongo/db/s/shardsvr_create_collection_command.cpp
index ce2a381d2f5..b8d8d739747 100644
--- a/src/mongo/db/s/shardsvr_create_collection_command.cpp
+++ b/src/mongo/db/s/shardsvr_create_collection_command.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/s/create_collection_coordinator.h"
#include "mongo/db/s/shard_collection_legacy.h"
+#include "mongo/db/s/sharding_ddl_coordinator_service.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/logv2/log.h"
#include "mongo/s/grid.h"
@@ -167,15 +168,25 @@ CreateCollectionResponse createCollection(OperationContext* opCtx,
uassert(
ErrorCodes::NotImplemented, "create collection not implemented yet", request.getShardKey());
- DistLockManager::ScopedDistLock dbDistLock(uassertStatusOK(DistLockManager::get(opCtx)->lock(
- opCtx, nss.db(), "shardCollection", DistLockManager::kDefaultLockTimeout)));
- DistLockManager::ScopedDistLock collDistLock(uassertStatusOK(DistLockManager::get(opCtx)->lock(
- opCtx, nss.ns(), "shardCollection", DistLockManager::kDefaultLockTimeout)));
+ auto coordinatorDoc = CreateCollectionCoordinatorDocument();
+ coordinatorDoc.setShardingDDLCoordinatorMetadata(
+ {{nss, DDLCoordinatorTypeEnum::kCreateCollection}});
+ coordinatorDoc.setShardKey(request.getShardKey());
+ if (request.getCollation())
+ coordinatorDoc.setCollation(request.getCollation());
+ if (request.getInitialSplitPoints())
+ coordinatorDoc.setInitialSplitPoints(request.getInitialSplitPoints());
+ if (request.getNumInitialChunks())
+ coordinatorDoc.setNumInitialChunks(request.getNumInitialChunks());
+ if (request.getPresplitHashedZones())
+ coordinatorDoc.setPresplitHashedZones(request.getPresplitHashedZones());
+ if (request.getUnique())
+ coordinatorDoc.setUnique(request.getUnique());
- auto createCollectionCoordinator =
- std::make_shared<CreateCollectionCoordinator>(opCtx, request);
- createCollectionCoordinator->run(opCtx).get(opCtx);
- return createCollectionCoordinator->getResultOnSuccess();
+ auto service = ShardingDDLCoordinatorService::getService(opCtx);
+ auto createCollectionCoordinator = checked_pointer_cast<CreateCollectionCoordinator>(
+ service->getOrCreateInstance(opCtx, coordinatorDoc.toBSON()));
+ return createCollectionCoordinator->getResult(opCtx);
}
class ShardsvrCreateCollectionCommand final : public TypedCommand<ShardsvrCreateCollectionCommand> {