summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2022-07-26 12:50:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-26 14:03:49 +0000
commitab2ced056ac09ea256cc4ff00fb2cbd0344f622a (patch)
treedd592c01c40df80c1d16d3bcb3ac5054b4132188 /src/mongo/db/s
parent12eb0fadbc8ad8490cd08fe04ac3728f1a076658 (diff)
downloadmongo-ab2ced056ac09ea256cc4ff00fb2cbd0344f622a.tar.gz
SERVER-62356 Serialize creation of sharded and unsharded Timeseries collections
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp258
-rw-r--r--src/mongo/db/s/create_collection_coordinator.h20
-rw-r--r--src/mongo/db/s/create_collection_coordinator_params.cpp206
-rw-r--r--src/mongo/db/s/create_collection_coordinator_params.h91
-rw-r--r--src/mongo/db/s/recoverable_critical_section_service.cpp6
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.h22
-rw-r--r--src/mongo/db/s/shardsvr_create_collection_command.cpp64
8 files changed, 455 insertions, 213 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index f7cbf589684..42e136a51fe 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -390,6 +390,7 @@ env.Library(
'config/set_user_write_block_mode_coordinator_document.idl',
'config/set_user_write_block_mode_coordinator.cpp',
'create_collection_coordinator_document.idl',
+ 'create_collection_coordinator_params.cpp',
'create_collection_coordinator.cpp',
'drop_collection_coordinator_document.idl',
'drop_collection_coordinator.cpp',
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index 216d88f5ddf..23be2f5d94f 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -210,55 +210,6 @@ int getNumShards(OperationContext* opCtx) {
return shardRegistry->getNumShards(opCtx);
}
-std::pair<boost::optional<Collation>, BSONObj> getCollation(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const boost::optional<BSONObj>& collation) {
- // Ensure the collation is valid. Currently we only allow the simple collation.
- std::unique_ptr<CollatorInterface> requestedCollator = nullptr;
- if (collation) {
- requestedCollator =
- uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(collation.value()));
- uassert(ErrorCodes::BadValue,
- str::stream() << "The collation for shardCollection must be {locale: 'simple'}, "
- << "but found: " << collation.value(),
- !requestedCollator);
- }
-
- AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden);
-
- const auto actualCollator = [&]() -> const CollatorInterface* {
- const auto& coll = autoColl.getCollection();
- if (coll) {
- uassert(
- ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped());
- return coll->getDefaultCollator();
- }
-
- return nullptr;
- }();
-
- if (!requestedCollator && !actualCollator)
- return {boost::none, BSONObj()};
-
- auto actualCollation = actualCollator->getSpec();
- auto actualCollatorBSON = actualCollation.toBSON();
-
- if (!collation) {
- auto actualCollatorFilter =
- uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(actualCollatorBSON));
- uassert(ErrorCodes::BadValue,
- str::stream() << "If no collation was specified, the collection collation must be "
- "{locale: 'simple'}, "
- << "but found: " << actualCollatorBSON,
- !actualCollatorFilter);
- }
-
- return {actualCollation, actualCollatorBSON};
-}
-
void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx,
const UUID& uuid,
const OperationSessionInfo& osi) {
@@ -363,6 +314,12 @@ void CreateCollectionCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuild
cmdInfoBuilder->appendElements(_request.toBSON());
}
+const NamespaceString& CreateCollectionCoordinator::nss() const {
+ // Rely on the resolved request parameters to retrieve the nss to be targeted by the
+ // coordinator.
+ return _request.getNameSpaceToShard();
+}
+
void CreateCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const {
// If we have two shard collections on the same namespace, then the arguments must be the same.
const auto otherDoc = CreateCollectionCoordinatorDocument::parse(
@@ -380,7 +337,6 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor)
.then([this, anchor = shared_from_this()] {
- _shardKeyPattern = ShardKeyPattern(*_request.getShardKey());
if (_doc.getPhase() < Phase::kCommit) {
auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
@@ -409,42 +365,34 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
opCtx, getCurrentSession(), **executor);
}
+ // Enter the critical sections before patching the user request to avoid data races
+ // with concurrenct creation of unsharded collections referencing the same
+ // namespace(s).
+ _acquireCriticalSections(opCtx);
+
+ _request.resolveAgainstLocalCatalog(opCtx);
+
+ _checkCollectionUUIDMismatch(opCtx);
+
// Log the start of the event only if we're not recovering.
_logStartCreateCollection(opCtx);
- // Quick check (without critical section) to see if another create collection
- // already succeeded.
+
+ // Check if the collection was already sharded by a past request
if (auto createCollectionResponseOpt =
sharding_ddl_util::checkIfCollectionAlreadySharded(
opCtx,
nss(),
- _shardKeyPattern->getKeyPattern().toBSON(),
- getCollation(opCtx, nss(), _request.getCollation()).second,
- _request.getUnique().value_or(false))) {
- _checkCollectionUUIDMismatch(opCtx);
-
- // The critical section can still be held here if the node committed the
- // sharding of the collection but then it stepped down before it managed to
- // delete the coordinator document
- RecoverableCriticalSectionService::get(opCtx)
- ->releaseRecoverableCriticalSection(
- opCtx,
- nss(),
- _critSecReason,
- ShardingCatalogClient::kMajorityWriteConcern);
-
+ _request.getShardKeyPattern().getKeyPattern().toBSON(),
+ _request.getResolvedCollation(),
+ _doc.getUnique().value_or(false))) {
+ // A previous request already created and commited the collection but there was
+ // a stepdown after the commit.
+ _releaseCriticalSections(opCtx);
_result = createCollectionResponseOpt;
return;
}
- // Entering the critical section. From this point on, the writes are blocked. Before
- // calling this method, we need the coordinator document to be persisted (and hence
- // the kCheck state), otherwise nothing will release the critical section in the
- // presence of a stepdown.
- RecoverableCriticalSectionService::get(opCtx)
- ->acquireRecoverableCriticalSectionBlockWrites(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
-
if (!_firstExecution) {
auto uuid = sharding_ddl_util::getCollectionUUID(opCtx, nss());
// If the collection can be found locally, then we clean up the config.chunks
@@ -463,12 +411,11 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
}
}
- _checkCollectionUUIDMismatch(opCtx);
_createPolicy(opCtx);
_createCollectionAndIndexes(opCtx);
audit::logShardCollection(opCtx->getClient(),
- nss().ns(),
+ nss().toString(),
*_request.getShardKey(),
_request.getUnique().value_or(false));
@@ -478,12 +425,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// Block reads/writes from here on if we need to create the collection on other
// shards, this way we prevent reads/writes that should be redirected to another
// shard
- RecoverableCriticalSectionService::get(opCtx)
- ->promoteRecoverableCriticalSectionToBlockAlsoReads(
- opCtx,
- nss(),
- _critSecReason,
- ShardingCatalogClient::kMajorityWriteConcern);
+ _promoteCriticalSectionsToBlockReads(opCtx);
+ ;
_updateSession(opCtx);
_createCollectionOnNonPrimaryShards(opCtx, getCurrentSession());
@@ -492,8 +435,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
}
// End of the critical section, from now on, read and writes are permitted.
- RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ _releaseCriticalSections(opCtx);
// Slow path. Create chunks (which might incur in an index scan) and commit must be
// done outside of the critical section to prevent writes from stalling in unsharded
@@ -514,54 +456,43 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
!status.isA<ErrorCategory::ShutdownError>()) {
LOGV2_ERROR(5458702,
"Error running create collection",
- "namespace"_attr = nss(),
+ "namespace"_attr = originalNss(),
"error"_attr = redact(status));
auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
- RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ _releaseCriticalSections(opCtx);
}
return status;
});
}
void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) {
- LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = nss());
-
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Namespace too long. Namespace: " << nss()
- << " Max: " << NamespaceString::MaxNsShardedCollectionLen,
- nss().size() <= NamespaceString::MaxNsShardedCollectionLen);
+ LOGV2_DEBUG(
+ 5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = originalNss());
- if (nss().db() == NamespaceString::kConfigDb) {
+ if (originalNss().db() == NamespaceString::kConfigDb) {
// Only allowlisted collections in config may be sharded (unless we are in test mode)
uassert(ErrorCodes::IllegalOperation,
"only special collections in the config db may be sharded",
- nss() == NamespaceString::kLogicalSessionsNamespace);
+ originalNss() == NamespaceString::kLogicalSessionsNamespace);
}
// Ensure that hashed and unique are not both set.
uassert(ErrorCodes::InvalidOptions,
"Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on "
"the hashed field by declaring an additional (non-hashed) unique index on the field.",
- !_shardKeyPattern->isHashedPattern() || !_request.getUnique().value_or(false));
-
- // Ensure that a time-series collection cannot be sharded unless the feature flag is enabled.
- if (nss().isTimeseriesBucketsCollection()) {
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "can't shard time-series collection " << nss(),
- feature_flags::gFeatureFlagShardedTimeSeries.isEnabled(
- serverGlobalParams.featureCompatibility) ||
- !timeseries::getTimeseriesOptions(opCtx, nss(), false));
- }
+ !ShardKeyPattern(*_request.getShardKey()).isHashedPattern() ||
+ !_request.getUnique().value_or(false));
// Ensure the namespace is valid.
uassert(ErrorCodes::IllegalOperation,
"can't shard system namespaces",
- !nss().isSystem() || nss() == NamespaceString::kLogicalSessionsNamespace ||
- nss().isTemporaryReshardingCollection() || nss().isTimeseriesBucketsCollection());
+ !originalNss().isSystem() ||
+ originalNss() == NamespaceString::kLogicalSessionsNamespace ||
+ originalNss().isTemporaryReshardingCollection() ||
+ originalNss().isTimeseriesBucketsCollection());
if (_request.getNumInitialChunks()) {
// Ensure numInitialChunks is within valid bounds.
@@ -581,14 +512,14 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx
numChunks <= maxNumInitialChunksTotal);
}
- if (nss().db() == NamespaceString::kConfigDb) {
+ if (originalNss().db() == NamespaceString::kConfigDb) {
auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findReponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kMajorityReadConcern,
- nss(),
+ originalNss(),
BSONObj(),
BSONObj(),
1));
@@ -607,42 +538,103 @@ void CreateCollectionCoordinator::_checkCollectionUUIDMismatch(OperationContext*
checkCollectionUUIDMismatch(opCtx, nss(), coll.getCollection(), _request.getCollectionUUID());
}
+void CreateCollectionCoordinator::_acquireCriticalSections(OperationContext* opCtx) const {
+ // TODO SERVER-68084 call RecoverableCriticalSectionService without the try/catch block
+ try {
+ RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx,
+ originalNss(),
+ _critSecReason,
+ ShardingCatalogClient::kMajorityWriteConcern,
+ boost::none);
+ } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) {
+ // If this collection already exists and it is a view we don't need the critical section
+ // because:
+ // 1. We will not shard the view namespace
+ // 2. This collection will remain a view since we are holding the DDL coll lock and thus
+ // the collection can't be dropped.
+ }
+
+ // Preventively acquire the critical section protecting the buckets namespace that the creation
+ // of a timeseries collection would require.
+ const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace();
+ RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+}
+
+void CreateCollectionCoordinator::_promoteCriticalSectionsToBlockReads(
+ OperationContext* opCtx) const {
+ // TODO SERVER-68084 call RecoverableCriticalSectionService without the try/catch block
+ try {
+ RecoverableCriticalSectionService::get(opCtx)
+ ->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) {
+ // ignore
+ }
+
+ const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace();
+ RecoverableCriticalSectionService::get(opCtx)
+ ->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+}
+
+void CreateCollectionCoordinator::_releaseCriticalSections(OperationContext* opCtx) const {
+ // TODO SERVER-68084 call RecoverableCriticalSectionService without the try/catch block
+ try {
+ RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection(
+ opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) {
+ // ignore
+ }
+
+ const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace();
+ RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection(
+ opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+}
+
void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* opCtx) {
LOGV2_DEBUG(
5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = nss());
+ auto collationBSON = _request.getResolvedCollation();
boost::optional<Collation> collation;
- std::tie(collation, _collationBSON) = getCollation(opCtx, nss(), _request.getCollation());
+ if (!collationBSON.isEmpty()) {
+ collation.emplace(
+ Collation::parse(IDLParserErrorContext("CreateCollectionCoordinator"), collationBSON));
+ }
// We need to implicitly create a timeseries view and underlying bucket collection.
- if (_collectionEmpty && _request.getTimeseries()) {
+ const auto& timeSeriesOptions = _request.getTimeseries();
+ if (_collectionEmpty && timeSeriesOptions) {
const auto viewName = nss().getTimeseriesViewNamespace();
- auto createCmd = makeCreateCommand(viewName, collation, _request.getTimeseries().get());
+ auto createCmd = makeCreateCommand(viewName, collation, timeSeriesOptions.get());
BSONObj createRes;
DBDirectClient localClient(opCtx);
localClient.runCommand(nss().db().toString(), createCmd, createRes);
auto createStatus = getStatusFromCommandResult(createRes);
+ // TODO this always supposed that the existing namespace is generated by a TS request!
+ // Should we verify that the options are compatible?
if (!createStatus.isOK() && createStatus.code() == ErrorCodes::NamespaceExists) {
- LOGV2_DEBUG(5909400,
- 3,
- "Timeseries namespace already exists",
- "namespace"_attr = viewName.toString());
+ LOGV2_WARNING(5909400,
+ "Timeseries namespace already exists",
+ "namespace"_attr = viewName.toString());
} else {
uassertStatusOK(createStatus);
}
}
- shardkeyutil::validateShardKeyIsNotEncrypted(opCtx, nss(), *_shardKeyPattern);
+ shardkeyutil::validateShardKeyIsNotEncrypted(opCtx, nss(), _request.getShardKeyPattern());
auto indexCreated = false;
if (_request.getImplicitlyCreateIndex().value_or(true)) {
indexCreated = shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible(
opCtx,
nss(),
- *_shardKeyPattern,
- _collationBSON,
+ _request.getShardKeyPattern(),
+ collationBSON,
_request.getUnique().value_or(false),
_request.getEnforceUniquenessCheck().value_or(true),
shardkeyutil::ValidationBehaviorsShardCollection(opCtx));
@@ -651,8 +643,8 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext*
"Must have an index compatible with the proposed shard key",
validShardKeyIndexExists(opCtx,
nss(),
- *_shardKeyPattern,
- _collationBSON,
+ _request.getShardKeyPattern(),
+ collationBSON,
_request.getUnique().value_or(false) &&
_request.getEnforceUniquenessCheck().value_or(true),
shardkeyutil::ValidationBehaviorsShardCollection(opCtx)));
@@ -681,11 +673,11 @@ void CreateCollectionCoordinator::_createPolicy(OperationContext* opCtx) {
_splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy(
opCtx,
- *_shardKeyPattern,
+ _request.getShardKeyPattern(),
_request.getNumInitialChunks() ? *_request.getNumInitialChunks() : 0,
_request.getPresplitHashedZones() ? *_request.getPresplitHashedZones() : false,
_request.getInitialSplitPoints(),
- getTagsAndValidate(opCtx, nss(), _shardKeyPattern->toBSON()),
+ getTagsAndValidate(opCtx, nss(), _request.getShardKeyPattern().toBSON()),
getNumShards(opCtx),
*_collectionEmpty,
!feature_flags::gNoMoreAutoSplitter.isEnabled(serverGlobalParams.featureCompatibility));
@@ -694,8 +686,10 @@ void CreateCollectionCoordinator::_createPolicy(OperationContext* opCtx) {
void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) {
LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = nss());
- _initialChunks = _splitPolicy->createFirstChunks(
- opCtx, *_shardKeyPattern, {*_collectionUUID, ShardingState::get(opCtx)->shardId()});
+ _initialChunks =
+ _splitPolicy->createFirstChunks(opCtx,
+ _request.getShardKeyPattern(),
+ {*_collectionUUID, ShardingState::get(opCtx)->shardId()});
// There must be at least one chunk.
invariant(_initialChunks);
@@ -776,7 +770,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
_initialChunks->collVersion().getTimestamp(),
Date_t::now(),
*_collectionUUID,
- _shardKeyPattern->getKeyPattern());
+ _request.getShardKeyPattern().getKeyPattern());
if (_request.getTimeseries()) {
TypeCollectionTimeseriesFields timeseriesFields;
@@ -784,8 +778,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
coll.setTimeseriesFields(std::move(timeseriesFields));
}
- if (_collationBSON) {
- coll.setDefaultCollation(_collationBSON.value());
+ if (auto collationBSON = _request.getResolvedCollation(); !collationBSON.isEmpty()) {
+ coll.setDefaultCollation(collationBSON);
}
if (_request.getUnique()) {
@@ -859,10 +853,10 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
void CreateCollectionCoordinator::_logStartCreateCollection(OperationContext* opCtx) {
BSONObjBuilder collectionDetail;
collectionDetail.append("shardKey", *_request.getShardKey());
- collectionDetail.append("collection", nss().ns());
+ collectionDetail.append("collection", originalNss().ns());
collectionDetail.append("primary", ShardingState::get(opCtx)->shardId().toString());
ShardingLogging::get(opCtx)->logChange(
- opCtx, "shardCollection.start", nss().ns(), collectionDetail.obj());
+ opCtx, "shardCollection.start", originalNss().ns(), collectionDetail.obj());
}
void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCtx) {
@@ -875,7 +869,7 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt
collectionDetail.appendNumber("numChunks",
static_cast<long long>(_initialChunks->chunks.size()));
ShardingLogging::get(opCtx)->logChange(
- opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj());
+ opCtx, "shardCollection.end", originalNss().ns(), collectionDetail.obj());
}
} // namespace mongo
diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h
index a1f8bbea4e8..3fa06c83e51 100644
--- a/src/mongo/db/s/create_collection_coordinator.h
+++ b/src/mongo/db/s/create_collection_coordinator.h
@@ -32,6 +32,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/s/config/initial_split_policy.h"
#include "mongo/db/s/create_collection_coordinator_document_gen.h"
+#include "mongo/db/s/create_collection_coordinator_params.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_ddl_coordinator.h"
#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
@@ -48,10 +49,10 @@ public:
CreateCollectionCoordinator(ShardingDDLCoordinatorService* service, const BSONObj& initialState)
: RecoverableShardingDDLCoordinator(service, "CreateCollectionCoordinator", initialState),
- _request(_doc.getCreateCollectionRequest()),
+ _request(_doc.getCreateCollectionRequest(), originalNss()),
_critSecReason(BSON("command"
<< "createCollection"
- << "ns" << nss().toString())) {}
+ << "ns" << originalNss().toString())) {}
~CreateCollectionCoordinator() = default;
@@ -71,7 +72,7 @@ public:
}
protected:
- const mongo::CreateCollectionRequest _request;
+ const NamespaceString& nss() const override;
private:
StringData serializePhase(const Phase& phase) const override {
@@ -91,6 +92,12 @@ private:
*/
void _checkCollectionUUIDMismatch(OperationContext* opCtx) const;
+ void _acquireCriticalSections(OperationContext* opCtx) const;
+
+ void _promoteCriticalSectionsToBlockReads(OperationContext* opCtx) const;
+
+ void _releaseCriticalSections(OperationContext* opCtx) const;
+
/**
* Ensures the collection is created locally and has the appropiate shard index.
*/
@@ -130,11 +137,9 @@ private:
*/
void _logEndCreateCollection(OperationContext* opCtx);
- const BSONObj _critSecReason;
+ CreateCollectionCoordinatorParams _request;
- // The shard key of the collection, static for the duration of the coordinator and reflects the
- // original command
- boost::optional<ShardKeyPattern> _shardKeyPattern;
+ const BSONObj _critSecReason;
// Set on successful completion of the coordinator
boost::optional<CreateCollectionResponse> _result;
@@ -142,7 +147,6 @@ private:
// The fields below are only populated if the coordinator enters in the branch where the
// collection is not already sharded (i.e., they will not be present on early return)
- boost::optional<BSONObj> _collationBSON;
boost::optional<UUID> _collectionUUID;
std::unique_ptr<InitialSplitPolicy> _splitPolicy;
diff --git a/src/mongo/db/s/create_collection_coordinator_params.cpp b/src/mongo/db/s/create_collection_coordinator_params.cpp
new file mode 100644
index 00000000000..fe07526175c
--- /dev/null
+++ b/src/mongo/db/s/create_collection_coordinator_params.cpp
@@ -0,0 +1,206 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/create_collection_coordinator_params.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
+namespace mongo {
+
+namespace {
+BSONObj resolveCollationForUserQueries(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<BSONObj>& collationInRequest) {
+ // Ensure the collation is valid. Currently we only allow the simple collation.
+ std::unique_ptr<CollatorInterface> requestedCollator = nullptr;
+ if (collationInRequest) {
+ requestedCollator =
+ uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(collationInRequest.value()));
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "The collation for shardCollection must be {locale: 'simple'}, "
+ << "but found: " << collationInRequest.value(),
+ !requestedCollator);
+ }
+
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden);
+
+ const auto actualCollator = [&]() -> const CollatorInterface* {
+ const auto& coll = autoColl.getCollection();
+ if (coll) {
+ uassert(
+ ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped());
+ return coll->getDefaultCollator();
+ }
+
+ return nullptr;
+ }();
+
+ if (!requestedCollator && !actualCollator)
+ return BSONObj();
+
+ auto actualCollation = actualCollator->getSpec();
+ auto actualCollatorBSON = actualCollation.toBSON();
+
+ if (!collationInRequest) {
+ auto actualCollatorFilter =
+ uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(actualCollatorBSON));
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "If no collation was specified, the collection collation must be "
+ "{locale: 'simple'}, "
+ << "but found: " << actualCollatorBSON,
+ !actualCollatorFilter);
+ }
+
+ return actualCollatorBSON;
+}
+} // namespace
+
+CreateCollectionCoordinatorParams::CreateCollectionCoordinatorParams(
+ const CreateCollectionRequest& request, const NamespaceString& targetedNamespace)
+ : CreateCollectionRequest(request),
+ _resolutionPerformed(false),
+ _originalNamespace(targetedNamespace),
+ _resolvedNamespace() {}
+
+void CreateCollectionCoordinatorParams::resolveAgainstLocalCatalog(OperationContext* opCtx) {
+ invariant(!_resolutionPerformed, "The resolution should only be performed once");
+ auto bucketsNs = _originalNamespace.makeTimeseriesBucketsNamespace();
+ auto existingBucketsColl =
+ CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs);
+
+ auto& timeseriesOptionsInRequest = CreateCollectionRequest::getTimeseries();
+
+ if (!timeseriesOptionsInRequest && !existingBucketsColl) {
+ // The request is targeting a new or existing standard collection.
+ _resolvedNamespace = _originalNamespace;
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Namespace too long. Namespace: " << _resolvedNamespace
+ << " Max: " << NamespaceString::MaxNsShardedCollectionLen,
+ _resolvedNamespace.size() <= NamespaceString::MaxNsShardedCollectionLen);
+
+ auto targetCollection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(
+ opCtx, _resolvedNamespace);
+ _resolvedCollation = resolveCollationForUserQueries(
+ opCtx, _resolvedNamespace, CreateCollectionRequest::getCollation());
+ _shardKeyPattern = ShardKeyPattern(*getShardKey());
+ _resolutionPerformed = true;
+ return;
+ }
+
+ // The request is targeting a new or existing Timeseries collection.
+ _resolvedNamespace = bucketsNs;
+ uassert(ErrorCodes::IllegalOperation,
+ "Sharding a timeseries collection feature is not enabled",
+ feature_flags::gFeatureFlagShardedTimeSeries.isEnabled(
+ serverGlobalParams.featureCompatibility));
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Namespace too long. Namespace: " << _resolvedNamespace
+ << " Max: " << NamespaceString::MaxNsShardedCollectionLen,
+ _resolvedNamespace.size() <= NamespaceString::MaxNsShardedCollectionLen);
+
+ // Consolidate the related request parameters...
+ auto existingTimeseriesOptions = [&bucketsNs, &existingBucketsColl] {
+ if (!existingBucketsColl) {
+ return boost::optional<TimeseriesOptions>();
+ }
+
+ uassert(6159000,
+ str::stream() << "the collection '" << bucketsNs
+ << "' does not have 'timeseries' options",
+ existingBucketsColl->getTimeseriesOptions());
+ return existingBucketsColl->getTimeseriesOptions();
+ }();
+
+ if (timeseriesOptionsInRequest && existingTimeseriesOptions) {
+ uassert(
+ 5731500,
+ str::stream() << "the 'timeseries' spec provided must match that of exists '"
+ << _originalNamespace << "' collection",
+ timeseries::optionsAreEqual(*timeseriesOptionsInRequest, *existingTimeseriesOptions));
+ } else if (!timeseriesOptionsInRequest) {
+ setTimeseries(existingTimeseriesOptions);
+ }
+
+ // check that they are consistent with the requested shard key...
+ auto timeFieldName = timeseriesOptionsInRequest->getTimeField();
+ auto metaFieldName = timeseriesOptionsInRequest->getMetaField();
+ BSONObjIterator shardKeyElems{*getShardKey()};
+ while (auto elem = shardKeyElems.next()) {
+ if (elem.fieldNameStringData() == timeFieldName) {
+ uassert(5914000,
+ str::stream() << "the time field '" << timeFieldName
+ << "' can be only at the end of the shard key pattern",
+ !shardKeyElems.more());
+ } else {
+ uassert(5914001,
+ str::stream() << "only the time field or meta field can be "
+ "part of shard key pattern",
+ metaFieldName &&
+ (elem.fieldNameStringData() == *metaFieldName ||
+ elem.fieldNameStringData().startsWith(*metaFieldName + ".")));
+ }
+ }
+ // ...and create the shard key pattern object.
+ _shardKeyPattern.emplace(
+ uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec(
+ *timeseriesOptionsInRequest, *getShardKey())));
+
+ _resolvedCollation = resolveCollationForUserQueries(
+ opCtx, _resolvedNamespace, CreateCollectionRequest::getCollation());
+ _resolutionPerformed = true;
+}
+
+BSONObj CreateCollectionCoordinatorParams::getResolvedCollation() const {
+ invariant(_resolutionPerformed);
+ return _resolvedCollation;
+}
+
+const NamespaceString& CreateCollectionCoordinatorParams::getNameSpaceToShard() const {
+ invariant(_resolutionPerformed);
+ return _resolvedNamespace;
+}
+
+const ShardKeyPattern& CreateCollectionCoordinatorParams::getShardKeyPattern() const {
+ invariant(_resolutionPerformed);
+ return *_shardKeyPattern;
+}
+
+const boost::optional<TimeseriesOptions>& CreateCollectionCoordinatorParams::getTimeseries() const {
+ invariant(_resolutionPerformed);
+ return CreateCollectionRequest::getTimeseries();
+}
+
+boost::optional<TimeseriesOptions>& CreateCollectionCoordinatorParams::getTimeseries() {
+ invariant(_resolutionPerformed);
+ return CreateCollectionRequest::getTimeseries();
+}
+} // namespace mongo
diff --git a/src/mongo/db/s/create_collection_coordinator_params.h b/src/mongo/db/s/create_collection_coordinator_params.h
new file mode 100644
index 00000000000..8f94043584f
--- /dev/null
+++ b/src/mongo/db/s/create_collection_coordinator_params.h
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
+#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
+#include "mongo/s/shard_key_pattern.h"
+#include "mongo/util/assert_util.h"
+
+
+namespace mongo {
+
+/**
+ * An extension of the CreateCollectionRequest parameters received by the Coordinator including
+ * methods to resolve the user request against the current state of the DB catalog and safely access
+ * the outcome.
+ */
+class CreateCollectionCoordinatorParams : public CreateCollectionRequest {
+public:
+ CreateCollectionCoordinatorParams(const CreateCollectionRequest& request,
+ const NamespaceString& targetedNamespace);
+
+ /*
+ * Resolution method to be invoked before accessing any of the request fields. It assumes that
+ * the caller has already acquired the needed resources to ensure that the catalog can be
+ * safely accessed.
+ */
+ void resolveAgainstLocalCatalog(OperationContext* opCtx);
+
+ const NamespaceString& getNameSpaceToShard() const;
+
+ const ShardKeyPattern& getShardKeyPattern() const;
+
+ const boost::optional<TimeseriesOptions>& getTimeseries() const;
+
+ BSONObj getResolvedCollation() const;
+
+ boost::optional<TimeseriesOptions>& getTimeseries();
+
+private:
+ bool _resolutionPerformed;
+ NamespaceString _originalNamespace;
+ NamespaceString _resolvedNamespace;
+ boost::optional<ShardKeyPattern> _shardKeyPattern;
+ BSONObj _resolvedCollation;
+
+ // Hide harmful non-virtual methods defined by the parent class
+
+ void setShardKey(boost::optional<BSONObj> value) {
+ MONGO_UNREACHABLE;
+ }
+
+ const boost::optional<BSONObj>& getCollation() const {
+ MONGO_UNREACHABLE;
+ }
+
+ void setCollation(boost::optional<BSONObj> value) {
+ MONGO_UNREACHABLE;
+ }
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/recoverable_critical_section_service.cpp b/src/mongo/db/s/recoverable_critical_section_service.cpp
index 0ea15b6e594..dbf5b492f71 100644
--- a/src/mongo/db/s/recoverable_critical_section_service.cpp
+++ b/src/mongo/db/s/recoverable_critical_section_service.cpp
@@ -98,6 +98,8 @@ void RecoverableCriticalSectionService::acquireRecoverableCriticalSectionBlockWr
{
Lock::GlobalLock lk(opCtx, MODE_IX);
+ // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
+ // construct cCollLock.
AutoGetCollection cCollLock(opCtx, nss, MODE_S);
DBDirectClient dbClient(opCtx);
@@ -184,6 +186,8 @@ void RecoverableCriticalSectionService::promoteRecoverableCriticalSectionToBlock
invariant(!opCtx->lockState()->isLocked());
{
+ // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
+ // construct cCollLock.
AutoGetCollection cCollLock(opCtx, nss, MODE_X);
DBDirectClient dbClient(opCtx);
@@ -284,6 +288,8 @@ void RecoverableCriticalSectionService::releaseRecoverableCriticalSection(
invariant(!opCtx->lockState()->isLocked());
{
+ // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
+ // construct cCollLock.
AutoGetCollection collLock(opCtx, nss, MODE_X);
DBDirectClient dbClient(opCtx);
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h
index f89861078c8..9119d44583d 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator.h
@@ -87,17 +87,6 @@ public:
return _completionPromise.getFuture();
}
- const NamespaceString& originalNss() const {
- return _coordId.getNss();
- }
-
- const NamespaceString& nss() const {
- if (const auto& bucketNss = metadata().getBucketNss()) {
- return bucketNss.get();
- }
- return originalNss();
- }
-
DDLCoordinatorTypeEnum operationType() const {
return _coordId.getOperationType();
}
@@ -112,6 +101,17 @@ public:
}
protected:
+ const NamespaceString& originalNss() const {
+ return _coordId.getNss();
+ }
+
+ virtual const NamespaceString& nss() const {
+ if (const auto& bucketNss = metadata().getBucketNss()) {
+ return bucketNss.get();
+ }
+ return originalNss();
+ }
+
virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) {
return {};
};
diff --git a/src/mongo/db/s/shardsvr_create_collection_command.cpp b/src/mongo/db/s/shardsvr_create_collection_command.cpp
index 3769e253b7b..f0405eb8636 100644
--- a/src/mongo/db/s/shardsvr_create_collection_command.cpp
+++ b/src/mongo/db/s/shardsvr_create_collection_command.cpp
@@ -29,14 +29,11 @@
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/s/create_collection_coordinator.h"
#include "mongo/db/s/sharding_ddl_coordinator_service.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
-#include "mongo/db/timeseries/timeseries_options.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
@@ -83,71 +80,14 @@ public:
"Create Collection path has not been implemented",
request().getShardKey());
- auto nss = ns();
- auto bucketsNs = nss.makeTimeseriesBucketsNamespace();
- auto bucketsColl =
- CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs);
- CreateCollectionRequest createCmdRequest = request().getCreateCollectionRequest();
-
- // If the 'system.buckets' exists or 'timeseries' parameters are passed in, we know that
- // we are trying shard a timeseries collection.
- if (bucketsColl || createCmdRequest.getTimeseries()) {
- uassert(5731502,
- "Sharding a timeseries collection feature is not enabled",
- feature_flags::gFeatureFlagShardedTimeSeries.isEnabled(
- serverGlobalParams.featureCompatibility));
-
- if (bucketsColl) {
- uassert(6159000,
- str::stream() << "the collection '" << bucketsNs
- << "' does not have 'timeseries' options",
- bucketsColl->getTimeseriesOptions());
-
- if (createCmdRequest.getTimeseries()) {
- uassert(5731500,
- str::stream()
- << "the 'timeseries' spec provided must match that of exists '"
- << nss << "' collection",
- timeseries::optionsAreEqual(*createCmdRequest.getTimeseries(),
- *bucketsColl->getTimeseriesOptions()));
- } else {
- createCmdRequest.setTimeseries(bucketsColl->getTimeseriesOptions());
- }
- }
-
- auto timeField = createCmdRequest.getTimeseries()->getTimeField();
- auto metaField = createCmdRequest.getTimeseries()->getMetaField();
- BSONObjIterator iter{*createCmdRequest.getShardKey()};
- while (auto elem = iter.next()) {
- if (elem.fieldNameStringData() == timeField) {
- uassert(5914000,
- str::stream()
- << "the time field '" << timeField
- << "' can be only at the end of the shard key pattern",
- !iter.more());
- } else {
- uassert(5914001,
- str::stream() << "only the time field or meta field can be "
- "part of shard key pattern",
- metaField &&
- (elem.fieldNameStringData() == *metaField ||
- elem.fieldNameStringData().startsWith(*metaField + ".")));
- }
- }
- nss = bucketsNs;
- createCmdRequest.setShardKey(
- uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec(
- *createCmdRequest.getTimeseries(), *createCmdRequest.getShardKey())));
- }
-
const auto createCollectionCoordinator = [&] {
FixedFCVRegion fixedFcvRegion(opCtx);
auto coordinatorDoc = [&] {
auto doc = CreateCollectionCoordinatorDocument();
doc.setShardingDDLCoordinatorMetadata(
- {{std::move(nss), DDLCoordinatorTypeEnum::kCreateCollection}});
- doc.setCreateCollectionRequest(std::move(createCmdRequest));
+ {{ns(), DDLCoordinatorTypeEnum::kCreateCollection}});
+ doc.setCreateCollectionRequest(request().getCreateCollectionRequest());
return doc.toBSON();
}();