summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2022-09-14 12:12:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-14 13:48:51 +0000
commit095cfdb2bd20b10d0a20ef876029120b69971368 (patch)
treec70e7b836ff96abe12af6abe603980fd21edd03f /src/mongo/db
parent40e9c7198a7f0742a2347232166695aae7312286 (diff)
downloadmongo-095cfdb2bd20b10d0a20ef876029120b69971368.tar.gz
SERVER-62356 Serialise the creation of sharded Timeseries collection with other DDL ops
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp612
-rw-r--r--src/mongo/db/s/create_collection_coordinator.h36
-rw-r--r--src/mongo/db/s/create_collection_coordinator_document.idl29
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.h22
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.idl4
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator_service.cpp2
-rw-r--r--src/mongo/db/s/sharding_recovery_service.cpp6
-rw-r--r--src/mongo/db/s/shardsvr_create_collection_command.cpp130
8 files changed, 614 insertions, 227 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index b312b54c8bb..e49fc035752 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -28,6 +28,7 @@
*/
+#include "mongo/db/s/create_collection_coordinator_document_gen.h"
#include "mongo/platform/basic.h"
#include "mongo/db/audit.h"
@@ -52,6 +53,7 @@
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/timeseries/catalog_helper.h"
#include "mongo/db/timeseries/timeseries_constants.h"
+#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/cluster_commands_helpers.h"
@@ -99,6 +101,64 @@ OptionsAndIndexes getCollectionOptionsAndIndexes(OperationContext* opCtx,
idIndex};
}
+// NOTES on the 'collation' optional parameter contained by the shardCollection() request:
+// 1. It specifies the ordering criteria that will be applied when comparing chunk boundaries
+// during sharding operations (such as move/mergeChunks).
+// 2. As per today, the only supported value (and the one applied by default) is 'simple'
+// collation.
+// 3. If the collection being sharded does not exist yet, it will also be used as the ordering
+// criteria to serve user queries over the shard index fields.
+// 4. If an existing unsharded collection is being targeted, the original 'collation' will still
+// be used to serve user queries, but the shardCollection is required to explicitly include the
+// 'collation' parameter to succeed (as an acknowledge of what specified in points 1. and 2.)
+BSONObj resolveCollationForUserQueries(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<BSONObj>& collationInRequest) {
+ // Ensure the collation is valid. Currently we only allow the simple collation.
+ std::unique_ptr<CollatorInterface> requestedCollator = nullptr;
+ if (collationInRequest) {
+ requestedCollator =
+ uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(collationInRequest.value()));
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "The collation for shardCollection must be {locale: 'simple'}, "
+ << "but found: " << collationInRequest.value(),
+ !requestedCollator);
+ }
+
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden);
+
+ const auto actualCollator = [&]() -> const CollatorInterface* {
+ const auto& coll = autoColl.getCollection();
+ if (coll) {
+ uassert(
+ ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped());
+ return coll->getDefaultCollator();
+ }
+
+ return nullptr;
+ }();
+
+ if (!requestedCollator && !actualCollator)
+ return BSONObj();
+
+ auto actualCollation = actualCollator->getSpec();
+ auto actualCollatorBSON = actualCollation.toBSON();
+
+ if (!collationInRequest) {
+ auto actualCollatorFilter =
+ uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(actualCollatorBSON));
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "If no collation was specified, the collection collation must be "
+ "{locale: 'simple'}, "
+ << "but found: " << actualCollatorBSON,
+ !actualCollatorFilter);
+ }
+
+ return actualCollatorBSON;
+}
+
/**
* Constructs the BSON specification document for the create collections command using the given
* namespace, collation, and timeseries options.
@@ -210,55 +270,6 @@ int getNumShards(OperationContext* opCtx) {
return shardRegistry->getNumShards(opCtx);
}
-std::pair<boost::optional<Collation>, BSONObj> getCollation(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const boost::optional<BSONObj>& collation) {
- // Ensure the collation is valid. Currently we only allow the simple collation.
- std::unique_ptr<CollatorInterface> requestedCollator = nullptr;
- if (collation) {
- requestedCollator =
- uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(collation.value()));
- uassert(ErrorCodes::BadValue,
- str::stream() << "The collation for shardCollection must be {locale: 'simple'}, "
- << "but found: " << collation.value(),
- !requestedCollator);
- }
-
- AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden);
-
- const auto actualCollator = [&]() -> const CollatorInterface* {
- const auto& coll = autoColl.getCollection();
- if (coll) {
- uassert(
- ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped());
- return coll->getDefaultCollator();
- }
-
- return nullptr;
- }();
-
- if (!requestedCollator && !actualCollator)
- return {boost::none, BSONObj()};
-
- auto actualCollation = actualCollator->getSpec();
- auto actualCollatorBSON = actualCollation.toBSON();
-
- if (!collation) {
- auto actualCollatorFilter =
- uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(actualCollatorBSON));
- uassert(ErrorCodes::BadValue,
- str::stream() << "If no collation was specified, the collection collation must be "
- "{locale: 'simple'}, "
- << "but found: " << actualCollatorBSON,
- !actualCollatorFilter);
- }
-
- return {actualCollation, actualCollatorBSON};
-}
-
void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx,
const UUID& uuid,
const OperationSessionInfo& osi) {
@@ -363,6 +374,14 @@ void CreateCollectionCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuild
cmdInfoBuilder->appendElements(_request.toBSON());
}
+const NamespaceString& CreateCollectionCoordinator::nss() const {
+ // Rely on the resolved request parameters to retrieve the nss to be targeted by the
+ // coordinator.
+ stdx::lock_guard lk{_docMutex};
+ invariant(_doc.getTranslatedRequestParams());
+ return _doc.getTranslatedRequestParams()->getNss();
+}
+
void CreateCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const {
// If we have two shard collections on the same namespace, then the arguments must be the same.
const auto otherDoc = CreateCollectionCoordinatorDocument::parse(
@@ -380,15 +399,38 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor)
.then([this, anchor = shared_from_this()] {
- _shardKeyPattern = ShardKeyPattern(*_request.getShardKey());
- if (_doc.getPhase() < Phase::kCommit) {
+ if (_doc.getPhase() < Phase::kTranslateRequest) {
auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
_checkCommandArguments(opCtx);
+ // Perform a preliminary check on whether the request may resolve into a no-op
+ // before acquiring any critical section.
+ auto createCollectionResponseOpt =
+ _checkIfCollectionAlreadyShardedWithSameOptions(opCtx);
+ if (createCollectionResponseOpt) {
+ _result = createCollectionResponseOpt;
+ // Launch an exception to directly jump to the end of the continuation chain
+ uasserted(ErrorCodes::RequestAlreadyFulfilled,
+ str::stream() << "The collection" << originalNss()
+ << "was already sharded by a past request");
+ }
}
})
+ .then(_executePhase(Phase::kTranslateRequest,
+ [this, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+ _logStartCreateCollection(opCtx);
+
+ // Enter the critical sections before patching the user request to
+ // avoid data races with concurrenct creation of unsharded
+ // collections referencing the same namespace(s).
+ _acquireCriticalSections(opCtx);
+ _doc.setTranslatedRequestParams(_translateRequestParameters(opCtx));
+ }))
.then(_executePhase(
Phase::kCommit,
[this, executor = executor, token, anchor = shared_from_this()] {
@@ -409,38 +451,23 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
opCtx, getCurrentSession(), **executor);
}
- // Log the start of the event only if we're not recovering.
- _logStartCreateCollection(opCtx);
-
- _checkCollectionUUIDMismatch(opCtx);
-
- // Quick check (without critical section) to see if another create collection
- // already succeeded.
+ // Check if the collection was already sharded by a past request
if (auto createCollectionResponseOpt =
sharding_ddl_util::checkIfCollectionAlreadySharded(
opCtx,
nss(),
- _shardKeyPattern->getKeyPattern().toBSON(),
- getCollation(opCtx, nss(), _request.getCollation()).second,
+ _doc.getTranslatedRequestParams()->getKeyPattern().toBSON(),
+ _doc.getTranslatedRequestParams()->getCollation(),
_request.getUnique().value_or(false))) {
- // The critical section can still be held here if the node committed the
- // sharding of the collection but then it stepped down before it managed to
- // delete the coordinator document
- ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ // A previous request already created and committed the collection but there was
+ // a stepdown after the commit.
+ _releaseCriticalSections(opCtx);
_result = createCollectionResponseOpt;
return;
}
- // Entering the critical section. From this point on, the writes are blocked. Before
- // calling this method, we need the coordinator document to be persisted (and hence
- // the kCheck state), otherwise nothing will release the critical section in the
- // presence of a stepdown.
- ShardingRecoveryService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
-
if (!_firstExecution) {
auto uuid = sharding_ddl_util::getCollectionUUID(opCtx, nss());
// If the collection can be found locally, then we clean up the config.chunks
@@ -459,26 +486,22 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
}
}
- _createPolicy(opCtx);
- _createCollectionAndIndexes(opCtx);
+ ShardKeyPattern shardKeyPattern(_doc.getTranslatedRequestParams()->getKeyPattern());
+ _createPolicy(opCtx, shardKeyPattern);
+ _createCollectionAndIndexes(opCtx, shardKeyPattern);
audit::logShardCollection(opCtx->getClient(),
- nss().ns(),
+ nss().toString(),
*_request.getShardKey(),
_request.getUnique().value_or(false));
if (_splitPolicy->isOptimized()) {
- _createChunks(opCtx);
+ _createChunks(opCtx, shardKeyPattern);
// Block reads/writes from here on if we need to create the collection on other
// shards, this way we prevent reads/writes that should be redirected to another
// shard
- ShardingRecoveryService::get(opCtx)
- ->promoteRecoverableCriticalSectionToBlockAlsoReads(
- opCtx,
- nss(),
- _critSecReason,
- ShardingCatalogClient::kMajorityWriteConcern);
+ _promoteCriticalSectionsToBlockReads(opCtx);
_updateSession(opCtx);
_createCollectionOnNonPrimaryShards(opCtx, getCurrentSession());
@@ -487,14 +510,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
}
// End of the critical section, from now on, read and writes are permitted.
- ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ _releaseCriticalSections(opCtx);
// Slow path. Create chunks (which might incur in an index scan) and commit must be
// done outside of the critical section to prevent writes from stalling in unsharded
// collections.
if (!_splitPolicy->isOptimized()) {
- _createChunks(opCtx);
+ _createChunks(opCtx, shardKeyPattern);
_commit(opCtx);
}
}))
@@ -505,64 +527,181 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
_logEndCreateCollection(opCtx);
})
.onError([this, anchor = shared_from_this()](const Status& status) {
+ if (status == ErrorCodes::RequestAlreadyFulfilled) {
+ return Status::OK();
+ }
+
if (!status.isA<ErrorCategory::NotPrimaryError>() &&
!status.isA<ErrorCategory::ShutdownError>()) {
LOGV2_ERROR(5458702,
"Error running create collection",
- "namespace"_attr = nss(),
+ "namespace"_attr = originalNss(),
"error"_attr = redact(status));
auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
- ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ _releaseCriticalSections(opCtx);
}
+
return status;
});
}
-void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) {
- LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = nss());
+boost::optional<CreateCollectionResponse>
+CreateCollectionCoordinator::_checkIfCollectionAlreadyShardedWithSameOptions(
+ OperationContext* opCtx) {
+ // Perfom check in the translation phase if the request is coming from a C2C command; this will
+ // allow to honor the contract with mongosync (see SERVER-67885 for details)
+ if (_request.getCollectionUUID()) {
+ return boost::none;
+ }
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Namespace too long. Namespace: " << nss()
- << " Max: " << NamespaceString::MaxNsShardedCollectionLen,
- nss().size() <= NamespaceString::MaxNsShardedCollectionLen);
+ // Preliminary check is unsupported for DDL requests received by nodes running old FCVs.
+ if (_timeseriesNssResolvedByCommandHandler()) {
+ return boost::none;
+ }
+
+ // Check is there is a standard sharded collection that matches the original request parameters
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(
+ opCtx, originalNss()));
+ if (routingInfo.isSharded()) {
+ auto requestMatchesExistingCollection = [&] {
+ // No timeseries fields in request
+ if (_request.getTimeseries()) {
+ return false;
+ }
+
+ if (_request.getUnique().value_or(false) != routingInfo.isUnique()) {
+ return false;
+ }
+
+ if (SimpleBSONObjComparator::kInstance.evaluate(
+ *_request.getShardKey() != routingInfo.getShardKeyPattern().toBSON())) {
+ return false;
+ }
+
+ auto defaultCollator = routingInfo.getDefaultCollator()
+ ? routingInfo.getDefaultCollator()->getSpec().toBSON()
+ : BSONObj();
+ if (SimpleBSONObjComparator::kInstance.evaluate(
+ defaultCollator !=
+ resolveCollationForUserQueries(
+ opCtx, originalNss(), _request.getCollation()))) {
+ return false;
+ }
+
+ return true;
+ }();
+
+ uassert(ErrorCodes::AlreadyInitialized,
+ str::stream() << "sharding already enabled for collection " << originalNss(),
+ requestMatchesExistingCollection);
+
+ CreateCollectionResponse response(
+ {routingInfo.getVersion(), CollectionIndexes(routingInfo.getVersion(), boost::none)});
+ response.setCollectionUUID(routingInfo.getUUID());
+ return response;
+ }
- if (nss().db() == NamespaceString::kConfigDb) {
+ // If the request is still unresolved, check if there is an existing TS buckets namespace that
+ // may be matched by the request.
+ auto bucketsNss = originalNss().makeTimeseriesBucketsNamespace();
+ routingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, bucketsNss));
+ if (!routingInfo.isSharded()) {
+ return boost::none;
+ }
+
+ auto requestMatchesExistingCollection = [&] {
+ if (routingInfo.isUnique() != _request.getUnique().value_or(false)) {
+ return false;
+ }
+
+ // Timeseries options match
+ const auto& timeseriesOptionsOnDisk =
+ (*routingInfo.getTimeseriesFields()).getTimeseriesOptions();
+ if (_request.getTimeseries() &&
+ !timeseries::optionsAreEqual(*_request.getTimeseries(), timeseriesOptionsOnDisk)) {
+ return false;
+ }
+
+ auto defaultCollator = routingInfo.getDefaultCollator()
+ ? routingInfo.getDefaultCollator()->getSpec().toBSON()
+ : BSONObj();
+ if (SimpleBSONObjComparator::kInstance.evaluate(
+ defaultCollator !=
+ resolveCollationForUserQueries(opCtx, bucketsNss, _request.getCollation()))) {
+ return false;
+ }
+
+ // Same Key Pattern
+ const auto& timeseriesOptions =
+ _request.getTimeseries() ? *_request.getTimeseries() : timeseriesOptionsOnDisk;
+ auto requestKeyPattern =
+ uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec(
+ timeseriesOptions, *_request.getShardKey()));
+ if (SimpleBSONObjComparator::kInstance.evaluate(routingInfo.getShardKeyPattern().toBSON() !=
+ requestKeyPattern)) {
+ return false;
+ }
+ return true;
+ }();
+
+ uassert(ErrorCodes::AlreadyInitialized,
+ str::stream() << "sharding already enabled for collection " << bucketsNss,
+ requestMatchesExistingCollection);
+
+ CreateCollectionResponse response(
+ {routingInfo.getVersion(), CollectionIndexes(routingInfo.getVersion(), boost::none)});
+ response.setCollectionUUID(routingInfo.getUUID());
+ return response;
+}
+
+void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) {
+ LOGV2_DEBUG(
+ 5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = originalNss());
+
+ if (originalNss().db() == NamespaceString::kConfigDb) {
// Only allowlisted collections in config may be sharded (unless we are in test mode)
uassert(ErrorCodes::IllegalOperation,
"only special collections in the config db may be sharded",
- nss() == NamespaceString::kLogicalSessionsNamespace);
+ originalNss() == NamespaceString::kLogicalSessionsNamespace);
}
// Ensure that hashed and unique are not both set.
uassert(ErrorCodes::InvalidOptions,
"Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on "
"the hashed field by declaring an additional (non-hashed) unique index on the field.",
- !_shardKeyPattern->isHashedPattern() || !_request.getUnique().value_or(false));
-
- // Ensure that a time-series collection cannot be sharded unless the feature flag is enabled.
- if (nss().isTimeseriesBucketsCollection()) {
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "can't shard time-series collection " << nss(),
- feature_flags::gFeatureFlagShardedTimeSeries.isEnabled(
- serverGlobalParams.featureCompatibility) ||
- !timeseries::getTimeseriesOptions(opCtx, nss(), false));
+ !ShardKeyPattern(*_request.getShardKey()).isHashedPattern() ||
+ !_request.getUnique().value_or(false));
+
+ if (_timeseriesNssResolvedByCommandHandler()) {
+ // Ensure that a time-series collection cannot be sharded unless the feature flag is
+ // enabled.
+ if (originalNss().isTimeseriesBucketsCollection()) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "can't shard time-series collection " << nss(),
+ feature_flags::gFeatureFlagShardedTimeSeries.isEnabled(
+ serverGlobalParams.featureCompatibility) ||
+ !timeseries::getTimeseriesOptions(opCtx, nss(), false));
+ }
}
// Ensure the namespace is valid.
uassert(ErrorCodes::IllegalOperation,
"can't shard system namespaces",
- !nss().isSystem() || nss() == NamespaceString::kLogicalSessionsNamespace ||
- nss().isTemporaryReshardingCollection() || nss().isTimeseriesBucketsCollection());
+ !originalNss().isSystem() ||
+ originalNss() == NamespaceString::kLogicalSessionsNamespace ||
+ originalNss().isTemporaryReshardingCollection() ||
+ originalNss().isTimeseriesBucketsCollection());
if (_request.getNumInitialChunks()) {
// Ensure numInitialChunks is within valid bounds.
// Cannot have more than kMaxSplitPoints initial chunks per shard. Setting a maximum of
- // 1,000,000 chunks in total to limit the amount of memory this command consumes so there is
- // less danger of an OOM error.
+ // 1,000,000 chunks in total to limit the amount of memory this command consumes so
+ // there is less danger of an OOM error.
const int maxNumInitialChunksForShards =
Grid::get(opCtx)->shardRegistry()->getNumShards(opCtx) * shardutil::kMaxSplitPoints;
@@ -576,14 +715,14 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx
numChunks <= maxNumInitialChunksTotal);
}
- if (nss().db() == NamespaceString::kConfigDb) {
+ if (originalNss().db() == NamespaceString::kConfigDb) {
auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findReponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kMajorityReadConcern,
- nss(),
+ originalNss(),
BSONObj(),
BSONObj(),
1));
@@ -597,22 +736,212 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx
}
}
-void CreateCollectionCoordinator::_checkCollectionUUIDMismatch(OperationContext* opCtx) const {
- AutoGetCollection coll{opCtx, nss(), MODE_IS};
- checkCollectionUUIDMismatch(opCtx, nss(), coll.getCollection(), _request.getCollectionUUID());
+TranslatedRequestParams CreateCollectionCoordinator::_translateRequestParameters(
+ OperationContext* opCtx) {
+ auto performCheckOnCollectionUUID = [this, opCtx](const NamespaceString& resolvedNss) {
+ AutoGetCollection coll{opCtx, resolvedNss, MODE_IS};
+ checkCollectionUUIDMismatch(
+ opCtx, resolvedNss, coll.getCollection(), _request.getCollectionUUID());
+ };
+
+ auto bucketsNs = originalNss().makeTimeseriesBucketsNamespace();
+ auto existingBucketsColl =
+ CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs);
+
+ auto targetingStandardCollection = !_request.getTimeseries() && !existingBucketsColl;
+
+ if (_timeseriesNssResolvedByCommandHandler() || targetingStandardCollection) {
+ const auto& resolvedNamespace = originalNss();
+ performCheckOnCollectionUUID(resolvedNamespace);
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Namespace too long. Namespace: " << resolvedNamespace
+ << " Max: " << NamespaceString::MaxNsShardedCollectionLen,
+ resolvedNamespace.size() <= NamespaceString::MaxNsShardedCollectionLen);
+ return TranslatedRequestParams(
+ resolvedNamespace,
+ *_request.getShardKey(),
+ resolveCollationForUserQueries(opCtx, resolvedNamespace, _request.getCollation()));
+ }
+
+ // The request is targeting a new or existing Timeseries collection and the request has not been
+ // patched yet.
+ const auto& resolvedNamespace = bucketsNs;
+ performCheckOnCollectionUUID(resolvedNamespace);
+ uassert(ErrorCodes::IllegalOperation,
+ "Sharding a timeseries collection feature is not enabled",
+ feature_flags::gFeatureFlagShardedTimeSeries.isEnabled(
+ serverGlobalParams.featureCompatibility));
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Namespace too long. Namespace: " << resolvedNamespace
+ << " Max: " << NamespaceString::MaxNsShardedCollectionLen,
+ resolvedNamespace.size() <= NamespaceString::MaxNsShardedCollectionLen);
+
+ // Consolidate the related request parameters...
+ auto existingTimeseriesOptions = [&bucketsNs, &existingBucketsColl] {
+ if (!existingBucketsColl) {
+ return boost::optional<TimeseriesOptions>();
+ }
+
+ uassert(6159000,
+ str::stream() << "the collection '" << bucketsNs
+ << "' does not have 'timeseries' options",
+ existingBucketsColl->getTimeseriesOptions());
+ return existingBucketsColl->getTimeseriesOptions();
+ }();
+
+ if (_request.getTimeseries() && existingTimeseriesOptions) {
+ uassert(5731500,
+ str::stream() << "the 'timeseries' spec provided must match that of exists '"
+ << originalNss() << "' collection",
+ timeseries::optionsAreEqual(*_request.getTimeseries(), *existingTimeseriesOptions));
+ } else if (!_request.getTimeseries()) {
+ _request.setTimeseries(existingTimeseriesOptions);
+ }
+
+ // check that they are consistent with the requested shard key before creating the key pattern
+ // object.
+ auto timeFieldName = _request.getTimeseries()->getTimeField();
+ auto metaFieldName = _request.getTimeseries()->getMetaField();
+ BSONObjIterator shardKeyElems{*_request.getShardKey()};
+ while (auto elem = shardKeyElems.next()) {
+ if (elem.fieldNameStringData() == timeFieldName) {
+ uassert(5914000,
+ str::stream() << "the time field '" << timeFieldName
+ << "' can be only at the end of the shard key pattern",
+ !shardKeyElems.more());
+ } else {
+ uassert(5914001,
+ str::stream() << "only the time field or meta field can be "
+ "part of shard key pattern",
+ metaFieldName &&
+ (elem.fieldNameStringData() == *metaFieldName ||
+ elem.fieldNameStringData().startsWith(*metaFieldName + ".")));
+ }
+ }
+ KeyPattern keyPattern(
+ uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec(
+ *_request.getTimeseries(), *_request.getShardKey())));
+ return TranslatedRequestParams(
+ resolvedNamespace,
+ keyPattern,
+ resolveCollationForUserQueries(opCtx, resolvedNamespace, _request.getCollation()));
+}
+
+bool CreateCollectionCoordinator::_timeseriesNssResolvedByCommandHandler() const {
+ return operationType() == DDLCoordinatorTypeEnum::kCreateCollectionPre61Compatible;
}
-void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* opCtx) {
+void CreateCollectionCoordinator::_acquireCriticalSections(OperationContext* opCtx) {
+ // TODO SERVER-68084 call ShardingRecoveryService without the try/catch block
+ try {
+ ShardingRecoveryService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx,
+ originalNss(),
+ _critSecReason,
+ ShardingCatalogClient::kMajorityWriteConcern,
+ boost::none);
+ } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) {
+ if (_timeseriesNssResolvedByCommandHandler()) {
+ throw;
+ }
+
+ // In case we acquisition was rejected because it targets an existing view, the critical
+ // section is not needed and the error can be dropped because:
+ // 1. We will not shard the view namespace
+ // 2. This collection will remain a view since we are holding the DDL coll lock and
+ // thus the collection can't be dropped.
+ _doc.setDisregardCriticalSectionOnOriginalNss(true);
+ }
+
+ if (!_timeseriesNssResolvedByCommandHandler()) {
+ // Preventively acquire the critical section protecting the buckets namespace that the
+ // creation of a timeseries collection would require.
+ const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace();
+ ShardingRecoveryService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ }
+}
+
+void CreateCollectionCoordinator::_promoteCriticalSectionsToBlockReads(
+ OperationContext* opCtx) const {
+ // TODO SERVER-68084 call ShardingRecoveryService without the if blocks.
+ if (!_doc.getDisregardCriticalSectionOnOriginalNss()) {
+ ShardingRecoveryService::get(opCtx)->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ }
+
+ if (!_timeseriesNssResolvedByCommandHandler()) {
+ const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace();
+ ShardingRecoveryService::get(opCtx)->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ }
+}
+
+void CreateCollectionCoordinator::_releaseCriticalSections(OperationContext* opCtx) {
+ // TODO SERVER-68084 call ShardingRecoveryService without the try/catch block.
+ try {
+ ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
+ opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ } catch (ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) {
+ // Ignore the error (when it is raised, we can assume that no critical section for the view
+ // was previously acquired).
+ }
+
+ if (!_timeseriesNssResolvedByCommandHandler()) {
+ const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace();
+ ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
+ opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ }
+}
+
+void CreateCollectionCoordinator::_createCollectionAndIndexes(
+ OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern) {
LOGV2_DEBUG(
5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = nss());
+ const auto& collationBSON = _doc.getTranslatedRequestParams()->getCollation();
boost::optional<Collation> collation;
- std::tie(collation, _collationBSON) = getCollation(opCtx, nss(), _request.getCollation());
+ if (!collationBSON.isEmpty()) {
+ collation.emplace(
+ Collation::parse(IDLParserContext("CreateCollectionCoordinator"), collationBSON));
+ }
// We need to implicitly create a timeseries view and underlying bucket collection.
if (_collectionEmpty && _request.getTimeseries()) {
+ // TODO SERVER-68084 Remove viewLock and the whole if section that constructs it while
+ // releasing the critical section on the originalNss.
+ boost::optional<AutoGetCollection> viewLock;
+ if (auto criticalSectionAcquiredOnOriginalNss =
+ !_doc.getDisregardCriticalSectionOnOriginalNss();
+ !_timeseriesNssResolvedByCommandHandler() && criticalSectionAcquiredOnOriginalNss) {
+ // This is the subcase of a not yet existing pair of view (originalNss)+ bucket (nss)
+ // timeseries collection that the DDL will have to create. Due to the current
+ // constraints of the code:
+ // - Such creation cannot be performed while holding the critical section over the views
+ // namespace (once the view gets created, the CS will not be releasable); instead,
+ // exclusive access must be enforced through a collection lock
+ // - The critical section cannot be released while holding a collection lock, so this
+ // operation must be performed first (leaving a small window open to data races)
+ ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
+ opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ _doc.setDisregardCriticalSectionOnOriginalNss(true);
+ viewLock.emplace(
+ opCtx, originalNss(), LockMode::MODE_X, AutoGetCollectionViewMode::kViewsPermitted);
+ // Once the exclusive access has been reacquired, ensure that no data race occurred.
+ auto catalog = CollectionCatalog::get(opCtx);
+ if (catalog->lookupView(opCtx, originalNss()) ||
+ catalog->lookupCollectionByNamespace(opCtx, originalNss())) {
+ _completeOnError = true;
+ uasserted(ErrorCodes::NamespaceExists,
+ str::stream() << "A conflicting DDL operation was completed while trying "
+ "to shard collection: "
+ << originalNss());
+ }
+ }
+
const auto viewName = nss().getTimeseriesViewNamespace();
- auto createCmd = makeCreateCommand(viewName, collation, _request.getTimeseries().value());
+ auto createCmd = makeCreateCommand(viewName, collation, *_request.getTimeseries());
BSONObj createRes;
DBDirectClient localClient(opCtx);
@@ -629,15 +958,15 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext*
}
}
- shardkeyutil::validateShardKeyIsNotEncrypted(opCtx, nss(), *_shardKeyPattern);
+ shardkeyutil::validateShardKeyIsNotEncrypted(opCtx, nss(), shardKeyPattern);
auto indexCreated = false;
if (_request.getImplicitlyCreateIndex().value_or(true)) {
indexCreated = shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible(
opCtx,
nss(),
- *_shardKeyPattern,
- _collationBSON,
+ shardKeyPattern,
+ collationBSON,
_request.getUnique().value_or(false),
_request.getEnforceUniquenessCheck().value_or(true),
shardkeyutil::ValidationBehaviorsShardCollection(opCtx));
@@ -646,8 +975,8 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext*
"Must have an index compatible with the proposed shard key",
validShardKeyIndexExists(opCtx,
nss(),
- *_shardKeyPattern,
- _collationBSON,
+ shardKeyPattern,
+ collationBSON,
_request.getUnique().value_or(false) &&
_request.getEnforceUniquenessCheck().value_or(true),
shardkeyutil::ValidationBehaviorsShardCollection(opCtx)));
@@ -658,8 +987,8 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext*
if (!indexCreated) {
replClientInfo.setLastOpToSystemLastOpTime(opCtx);
}
- // Wait until the index is majority written, to prevent having the collection commited to the
- // config server, but the index creation rolled backed on stepdowns.
+ // Wait until the index is majority written, to prevent having the collection commited to
+ // the config server, but the index creation rolled backed on stepdowns.
WriteConcernResult ignoreResult;
uassertStatusOK(waitForWriteConcern(opCtx,
replClientInfo.getLastOp(),
@@ -669,28 +998,28 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext*
_collectionUUID = *sharding_ddl_util::getCollectionUUID(opCtx, nss());
}
-void CreateCollectionCoordinator::_createPolicy(OperationContext* opCtx) {
+void CreateCollectionCoordinator::_createPolicy(OperationContext* opCtx,
+ const ShardKeyPattern& shardKeyPattern) {
LOGV2_DEBUG(6042001, 2, "Create collection _createPolicy", "namespace"_attr = nss());
-
_collectionEmpty = checkIfCollectionIsEmpty(opCtx, nss());
_splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy(
opCtx,
- *_shardKeyPattern,
+ shardKeyPattern,
_request.getNumInitialChunks() ? *_request.getNumInitialChunks() : 0,
_request.getPresplitHashedZones() ? *_request.getPresplitHashedZones() : false,
_request.getInitialSplitPoints(),
- getTagsAndValidate(opCtx, nss(), _shardKeyPattern->toBSON()),
+ getTagsAndValidate(opCtx, nss(), shardKeyPattern.toBSON()),
getNumShards(opCtx),
*_collectionEmpty,
!feature_flags::gNoMoreAutoSplitter.isEnabled(serverGlobalParams.featureCompatibility));
}
-void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) {
+void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx,
+ const ShardKeyPattern& shardKeyPattern) {
LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = nss());
-
_initialChunks = _splitPolicy->createFirstChunks(
- opCtx, *_shardKeyPattern, {*_collectionUUID, ShardingState::get(opCtx)->shardId()});
+ opCtx, shardKeyPattern, {*_collectionUUID, ShardingState::get(opCtx)->shardId()});
// There must be at least one chunk.
invariant(_initialChunks);
@@ -771,7 +1100,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
_initialChunks->collVersion().getTimestamp(),
Date_t::now(),
*_collectionUUID,
- _shardKeyPattern->getKeyPattern());
+ _doc.getTranslatedRequestParams()->getKeyPattern());
if (_request.getTimeseries()) {
TypeCollectionTimeseriesFields timeseriesFields;
@@ -779,8 +1108,9 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
coll.setTimeseriesFields(std::move(timeseriesFields));
}
- if (_collationBSON) {
- coll.setDefaultCollation(_collationBSON.value());
+ if (auto collationBSON = _doc.getTranslatedRequestParams()->getCollation();
+ !collationBSON.isEmpty()) {
+ coll.setDefaultCollation(collationBSON);
}
if (_request.getUnique()) {
@@ -802,8 +1132,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
"namespace"_attr = nss(),
"error"_attr = redact(ex));
- // If the refresh fails, then set the shard version to UNKNOWN and let a future operation to
- // refresh the metadata.
+ // If the refresh fails, then set the shard version to UNKNOWN and let a future
+ // operation to refresh the metadata.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
AutoGetCollection autoColl(opCtx, nss(), MODE_IX);
CollectionShardingRuntime::get(opCtx, nss())->clearFilteringMetadata(opCtx);
@@ -811,8 +1141,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
throw;
}
- // Best effort refresh to warm up cache of all involved shards so we can have a cluster ready to
- // receive operations.
+ // Best effort refresh to warm up cache of all involved shards so we can have a cluster
+ // ready to receive operations.
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId();
@@ -856,10 +1186,10 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
void CreateCollectionCoordinator::_logStartCreateCollection(OperationContext* opCtx) {
BSONObjBuilder collectionDetail;
collectionDetail.append("shardKey", *_request.getShardKey());
- collectionDetail.append("collection", nss().ns());
+ collectionDetail.append("collection", originalNss().ns());
collectionDetail.append("primary", ShardingState::get(opCtx)->shardId().toString());
ShardingLogging::get(opCtx)->logChange(
- opCtx, "shardCollection.start", nss().ns(), collectionDetail.obj());
+ opCtx, "shardCollection.start", originalNss().ns(), collectionDetail.obj());
}
void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCtx) {
@@ -872,7 +1202,7 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt
collectionDetail.appendNumber("numChunks",
static_cast<long long>(_initialChunks->chunks.size()));
ShardingLogging::get(opCtx)->logChange(
- opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj());
+ opCtx, "shardCollection.end", originalNss().ns(), collectionDetail.obj());
}
} // namespace mongo
diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h
index a1f8bbea4e8..86aee402c3f 100644
--- a/src/mongo/db/s/create_collection_coordinator.h
+++ b/src/mongo/db/s/create_collection_coordinator.h
@@ -51,7 +51,7 @@ public:
_request(_doc.getCreateCollectionRequest()),
_critSecReason(BSON("command"
<< "createCollection"
- << "ns" << nss().toString())) {}
+ << "ns" << originalNss().toString())) {}
~CreateCollectionCoordinator() = default;
@@ -71,7 +71,7 @@ public:
}
protected:
- const mongo::CreateCollectionRequest _request;
+ const NamespaceString& nss() const override;
private:
StringData serializePhase(const Phase& phase) const override {
@@ -86,25 +86,36 @@ private:
*/
void _checkCommandArguments(OperationContext* opCtx);
- /**
- * Checks that the collection has UUID matching the collectionUUID parameter, if provided.
- */
- void _checkCollectionUUIDMismatch(OperationContext* opCtx) const;
+ boost::optional<CreateCollectionResponse> _checkIfCollectionAlreadyShardedWithSameOptions(
+ OperationContext* opCtx);
+
+ TranslatedRequestParams _translateRequestParameters(OperationContext* opCtx);
+
+ // TODO SERVER-68008 Remove once 7.0 becomes last LTS; when the function appears in if clauses,
+ // modify the code assuming that a "false" value gets returned
+ bool _timeseriesNssResolvedByCommandHandler() const;
+
+ void _acquireCriticalSections(OperationContext* opCtx);
+
+ void _promoteCriticalSectionsToBlockReads(OperationContext* opCtx) const;
+
+ void _releaseCriticalSections(OperationContext* opCtx);
/**
* Ensures the collection is created locally and has the appropiate shard index.
*/
- void _createCollectionAndIndexes(OperationContext* opCtx);
+ void _createCollectionAndIndexes(OperationContext* opCtx,
+ const ShardKeyPattern& shardKeyPattern);
/**
* Creates the appropiate split policy.
*/
- void _createPolicy(OperationContext* opCtx);
+ void _createPolicy(OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern);
/**
* Given the appropiate split policy, create the initial chunks.
*/
- void _createChunks(OperationContext* opCtx);
+ void _createChunks(OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern);
/**
* If the optimized path can be taken, ensure the collection is already created in all the
@@ -130,11 +141,9 @@ private:
*/
void _logEndCreateCollection(OperationContext* opCtx);
- const BSONObj _critSecReason;
+ mongo::CreateCollectionRequest _request;
- // The shard key of the collection, static for the duration of the coordinator and reflects the
- // original command
- boost::optional<ShardKeyPattern> _shardKeyPattern;
+ const BSONObj _critSecReason;
// Set on successful completion of the coordinator
boost::optional<CreateCollectionResponse> _result;
@@ -142,7 +151,6 @@ private:
// The fields below are only populated if the coordinator enters in the branch where the
// collection is not already sharded (i.e., they will not be present on early return)
- boost::optional<BSONObj> _collationBSON;
boost::optional<UUID> _collectionUUID;
std::unique_ptr<InitialSplitPolicy> _splitPolicy;
diff --git a/src/mongo/db/s/create_collection_coordinator_document.idl b/src/mongo/db/s/create_collection_coordinator_document.idl
index 8e8cbe80c07..925ae0039a0 100644
--- a/src/mongo/db/s/create_collection_coordinator_document.idl
+++ b/src/mongo/db/s/create_collection_coordinator_document.idl
@@ -43,9 +43,28 @@ enums:
type: string
values:
kUnset: "unset"
+ kTranslateRequest: "translateRequest"
kCommit: "commit"
structs:
+ TranslatedRequestParams:
+ description: "Subset of CreateCollectionRequest fields that may get overridden once the kTranslateRequest phase gets executed"
+ generate_comparison_operators: false
+ strict: false
+ fields:
+ nss:
+ type: namespacestring
+ description: "The namespace that will be used by the DDL while accessing the Sharding Catalog"
+ optional: false
+ keyPattern:
+ type: KeyPattern
+ description: "The shard key that will be used by the DDL while accessing the Sharding Catalog"
+ optional: false
+ collation:
+ type: object_owned
+ description: "Collation value from the targeted unsharded collection if this exists; otherwise, simple-collation"
+ optional: false
+
CreateCollectionCoordinatorDocument:
description: "Object with neccessary fields to create a collection"
generate_comparison_operators: false
@@ -57,4 +76,12 @@ structs:
phase:
type: CreateCollectionCoordinatorPhase
description: "Coordinator phase."
- default: kUnset \ No newline at end of file
+ default: kUnset
+ translatedRequestParams:
+ type: TranslatedRequestParams
+ description: "The field is populated only once the kTranslateRequest phase is completed"
+ optional: true
+ # TODO SERVER-68084 remove the following field
+ disregardCriticalSectionOnOriginalNss:
+ type: optionalBool
+ description: "When set to true, the DDL operation is being performed without acquiring a critical section over the NSS specified by the user."
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h
index d8eaad0139d..d8f42fc6ea5 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator.h
@@ -87,17 +87,6 @@ public:
return _completionPromise.getFuture();
}
- const NamespaceString& originalNss() const {
- return _coordId.getNss();
- }
-
- const NamespaceString& nss() const {
- if (const auto& bucketNss = metadata().getBucketNss()) {
- return bucketNss.get();
- }
- return originalNss();
- }
-
DDLCoordinatorTypeEnum operationType() const {
return _coordId.getOperationType();
}
@@ -112,6 +101,17 @@ public:
}
protected:
+ const NamespaceString& originalNss() const {
+ return _coordId.getNss();
+ }
+
+ virtual const NamespaceString& nss() const {
+ if (const auto& bucketNss = metadata().getBucketNss()) {
+ return bucketNss.get();
+ }
+ return originalNss();
+ }
+
virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) {
return {};
};
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.idl b/src/mongo/db/s/sharding_ddl_coordinator.idl
index e38a0f63334..9398ef118fa 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.idl
+++ b/src/mongo/db/s/sharding_ddl_coordinator.idl
@@ -47,7 +47,9 @@ enums:
kDropDatabase: "dropDatabase"
kDropCollection: "dropCollection"
kRenameCollection: "renameCollection"
- kCreateCollection: "createCollection_V2"
+ kCreateCollection: "createCollection_V3"
+ # TODO SERVER-68008: Remove once 7.0 becomes last LTS
+ kCreateCollectionPre61Compatible: "createCollection_V2"
kRefineCollectionShardKey: "refineCollectionShardKey"
kSetAllowMigrations: "setAllowMigrations"
kCollMod: "collMod_V3"
diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
index 1fc117046d2..ca445697a3e 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
@@ -77,6 +77,8 @@ std::shared_ptr<ShardingDDLCoordinator> constructShardingDDLCoordinatorInstance(
case DDLCoordinatorTypeEnum::kRenameCollection:
return std::make_shared<RenameCollectionCoordinator>(service, std::move(initialState));
case DDLCoordinatorTypeEnum::kCreateCollection:
+ // TODO SERVER-68008 Remove the Pre61Compatible case once 7.0 becomes last LTS
+ case DDLCoordinatorTypeEnum::kCreateCollectionPre61Compatible:
return std::make_shared<CreateCollectionCoordinator>(service, std::move(initialState));
break;
case DDLCoordinatorTypeEnum::kRefineCollectionShardKey:
diff --git a/src/mongo/db/s/sharding_recovery_service.cpp b/src/mongo/db/s/sharding_recovery_service.cpp
index be00f0fefc4..7f895570bae 100644
--- a/src/mongo/db/s/sharding_recovery_service.cpp
+++ b/src/mongo/db/s/sharding_recovery_service.cpp
@@ -97,6 +97,8 @@ void ShardingRecoveryService::acquireRecoverableCriticalSectionBlockWrites(
{
Lock::GlobalLock lk(opCtx, MODE_IX);
+ // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
+ // construct cCollLock.
AutoGetCollection cCollLock(opCtx, nss, MODE_S);
DBDirectClient dbClient(opCtx);
@@ -183,6 +185,8 @@ void ShardingRecoveryService::promoteRecoverableCriticalSectionToBlockAlsoReads(
invariant(!opCtx->lockState()->isLocked());
{
+ // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
+ // construct cCollLock.
AutoGetCollection cCollLock(opCtx, nss, MODE_X);
DBDirectClient dbClient(opCtx);
@@ -283,6 +287,8 @@ void ShardingRecoveryService::releaseRecoverableCriticalSection(
invariant(!opCtx->lockState()->isLocked());
{
+ // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
+ // construct cCollLock.
AutoGetCollection collLock(opCtx, nss, MODE_X);
DBDirectClient dbClient(opCtx);
diff --git a/src/mongo/db/s/shardsvr_create_collection_command.cpp b/src/mongo/db/s/shardsvr_create_collection_command.cpp
index c5d40d29bb5..3155963cc52 100644
--- a/src/mongo/db/s/shardsvr_create_collection_command.cpp
+++ b/src/mongo/db/s/shardsvr_create_collection_command.cpp
@@ -32,12 +32,15 @@
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/feature_compatibility_version.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/s/create_collection_coordinator.h"
#include "mongo/db/s/sharding_ddl_coordinator_service.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
#include "mongo/db/timeseries/timeseries_options.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
+#include "mongo/s/sharding_feature_flags_gen.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
@@ -45,6 +48,64 @@
namespace mongo {
namespace {
+void translateToTimeseriesCollection(OperationContext* opCtx,
+ NamespaceString* nss,
+ CreateCollectionRequest* createCmdRequest) {
+ auto bucketsNs = nss->makeTimeseriesBucketsNamespace();
+ auto bucketsColl =
+ CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs);
+
+ // If the 'system.buckets' exists or 'timeseries' parameters are passed in, we know that
+ // we are trying shard a timeseries collection.
+ if (bucketsColl || createCmdRequest->getTimeseries()) {
+ uassert(5731502,
+ "Sharding a timeseries collection feature is not enabled",
+ feature_flags::gFeatureFlagShardedTimeSeries.isEnabled(
+ serverGlobalParams.featureCompatibility));
+
+ if (bucketsColl) {
+ uassert(6235600,
+ str::stream() << "the collection '" << bucketsNs
+ << "' does not have 'timeseries' options",
+ bucketsColl->getTimeseriesOptions());
+
+ if (createCmdRequest->getTimeseries()) {
+ uassert(6235601,
+ str::stream()
+ << "the 'timeseries' spec provided must match that of exists '" << nss
+ << "' collection",
+ timeseries::optionsAreEqual(*createCmdRequest->getTimeseries(),
+ *bucketsColl->getTimeseriesOptions()));
+ } else {
+ createCmdRequest->setTimeseries(bucketsColl->getTimeseriesOptions());
+ }
+ }
+
+ auto timeField = createCmdRequest->getTimeseries()->getTimeField();
+ auto metaField = createCmdRequest->getTimeseries()->getMetaField();
+ BSONObjIterator iter{*createCmdRequest->getShardKey()};
+ while (auto elem = iter.next()) {
+ if (elem.fieldNameStringData() == timeField) {
+ uassert(6235602,
+ str::stream() << "the time field '" << timeField
+ << "' can be only at the end of the shard key pattern",
+ !iter.more());
+ } else {
+ uassert(6235603,
+ str::stream() << "only the time field or meta field can be "
+ "part of shard key pattern",
+ metaField &&
+ (elem.fieldNameStringData() == *metaField ||
+ elem.fieldNameStringData().startsWith(*metaField + ".")));
+ }
+ }
+ *nss = bucketsNs;
+ createCmdRequest->setShardKey(
+ uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec(
+ *createCmdRequest->getTimeseries(), *createCmdRequest->getShardKey())));
+ }
+}
+
class ShardsvrCreateCollectionCommand final : public TypedCommand<ShardsvrCreateCollectionCommand> {
public:
using Request = ShardsvrCreateCollection;
@@ -83,69 +144,20 @@ public:
"Create Collection path has not been implemented",
request().getShardKey());
- auto nss = ns();
- auto bucketsNs = nss.makeTimeseriesBucketsNamespace();
- auto bucketsColl =
- CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs);
- CreateCollectionRequest createCmdRequest = request().getCreateCollectionRequest();
-
- // If the 'system.buckets' exists or 'timeseries' parameters are passed in, we know that
- // we are trying shard a timeseries collection.
- if (bucketsColl || createCmdRequest.getTimeseries()) {
- uassert(5731502,
- "Sharding a timeseries collection feature is not enabled",
- feature_flags::gFeatureFlagShardedTimeSeries.isEnabled(
- serverGlobalParams.featureCompatibility));
-
- if (bucketsColl) {
- uassert(6159000,
- str::stream() << "the collection '" << bucketsNs
- << "' does not have 'timeseries' options",
- bucketsColl->getTimeseriesOptions());
-
- if (createCmdRequest.getTimeseries()) {
- uassert(5731500,
- str::stream()
- << "the 'timeseries' spec provided must match that of exists '"
- << nss << "' collection",
- timeseries::optionsAreEqual(*createCmdRequest.getTimeseries(),
- *bucketsColl->getTimeseriesOptions()));
- } else {
- createCmdRequest.setTimeseries(bucketsColl->getTimeseriesOptions());
- }
- }
-
- auto timeField = createCmdRequest.getTimeseries()->getTimeField();
- auto metaField = createCmdRequest.getTimeseries()->getMetaField();
- BSONObjIterator iter{*createCmdRequest.getShardKey()};
- while (auto elem = iter.next()) {
- if (elem.fieldNameStringData() == timeField) {
- uassert(5914000,
- str::stream()
- << "the time field '" << timeField
- << "' can be only at the end of the shard key pattern",
- !iter.more());
- } else {
- uassert(5914001,
- str::stream() << "only the time field or meta field can be "
- "part of shard key pattern",
- metaField &&
- (elem.fieldNameStringData() == *metaField ||
- elem.fieldNameStringData().startsWith(*metaField + ".")));
- }
+ const auto createCollectionCoordinator = [&] {
+ auto nssToForward = ns();
+ auto requestToForward = request().getCreateCollectionRequest();
+ auto coordinatorType = DDLCoordinatorTypeEnum::kCreateCollection;
+ if (!feature_flags::gImplicitDDLTimeseriesNssTranslation.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ translateToTimeseriesCollection(opCtx, &nssToForward, &requestToForward);
+ coordinatorType = DDLCoordinatorTypeEnum::kCreateCollectionPre61Compatible;
}
- nss = bucketsNs;
- createCmdRequest.setShardKey(
- uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec(
- *createCmdRequest.getTimeseries(), *createCmdRequest.getShardKey())));
- }
- const auto createCollectionCoordinator = [&] {
auto coordinatorDoc = [&] {
auto doc = CreateCollectionCoordinatorDocument();
- doc.setShardingDDLCoordinatorMetadata(
- {{std::move(nss), DDLCoordinatorTypeEnum::kCreateCollection}});
- doc.setCreateCollectionRequest(std::move(createCmdRequest));
+ doc.setShardingDDLCoordinatorMetadata({{nssToForward, coordinatorType}});
+ doc.setCreateCollectionRequest(requestToForward);
return doc.toBSON();
}();