diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2022-09-14 12:12:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-14 13:48:51 +0000 |
commit | 095cfdb2bd20b10d0a20ef876029120b69971368 (patch) | |
tree | c70e7b836ff96abe12af6abe603980fd21edd03f | |
parent | 40e9c7198a7f0742a2347232166695aae7312286 (diff) | |
download | mongo-095cfdb2bd20b10d0a20ef876029120b69971368.tar.gz |
SERVER-62356 Serialise the creation of sharded Timeseries collection with other DDL ops
-rw-r--r-- | jstests/sharding/timeseries_shard_collection.js | 23 | ||||
-rw-r--r-- | src/mongo/base/error_codes.yml | 3 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 612 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.h | 36 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator_document.idl | 29 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.h | 22 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator_service.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_recovery_service.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_create_collection_command.cpp | 130 |
10 files changed, 629 insertions, 238 deletions
diff --git a/jstests/sharding/timeseries_shard_collection.js b/jstests/sharding/timeseries_shard_collection.js index 5984e7900c8..6068743a631 100644 --- a/jstests/sharding/timeseries_shard_collection.js +++ b/jstests/sharding/timeseries_shard_collection.js @@ -15,6 +15,9 @@ Random.setRandomSeed(); const st = new ShardingTest({shards: 2, rs: {nodes: 2}}); +// TODO SERVER-68008 remove error codes 6235600, 6235601, 6235602 and 6235603 from all the +// assertions contained by this test. + const dbName = 'test'; assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); const sDB = st.s.getDB(dbName); @@ -62,7 +65,7 @@ if (TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { key: {'hostId': 1}, timeseries: {timeField: 'time'}, }), - 5731500); + [5731500, 6235601]); } assert.commandWorked( @@ -242,28 +245,28 @@ if (TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { key: {_id: 1}, timeseries: {timeField: 'time', metaField: 'hostId'}, }), - 5914001); + [5914001, 6235603]); assert.commandFailedWithCode(st.s.adminCommand({ shardCollection: 'test.ts', key: {_id: 1, time: 1}, timeseries: {timeField: 'time', metaField: 'hostId'}, }), - 5914001); + [5914001, 6235603]); assert.commandFailedWithCode(st.s.adminCommand({ shardCollection: 'test.ts', key: {_id: 1, hostId: 1}, timeseries: {timeField: 'time', metaField: 'hostId'}, }), - 5914001); + [5914001, 6235603]); assert.commandFailedWithCode(st.s.adminCommand({ shardCollection: 'test.ts', key: {a: 1}, timeseries: {timeField: 'time', metaField: 'hostId'}, }), - 5914001); + [5914001, 6235603]); // Shared key where time is not the last field in shard key should fail. assert.commandFailedWithCode(st.s.adminCommand({ @@ -271,7 +274,7 @@ if (TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { key: {time: 1, hostId: 1}, timeseries: {timeField: 'time', metaField: 'hostId'} }), - 5914000); + [5914000, 6235602]); assert(sDB.getCollection("ts").drop()); })(); @@ -285,14 +288,14 @@ if (TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { key: {_id: 1}, timeseries: {timeField: 'time'}, }), - 5914001); + [5914001, 6235603]); assert.commandFailedWithCode(st.s.adminCommand({ shardCollection: 'test.ts', key: {a: 1}, timeseries: {timeField: 'time'}, }), - 5914001); + [5914001, 6235603]); assert.commandWorked(st.s.adminCommand( {shardCollection: 'test.ts', key: {time: 1}, timeseries: {timeField: 'time'}})); @@ -312,11 +315,11 @@ if (TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { shardCollection: 'test.ts', key: {time: 1}, }), - 6159000); + [6159000, 6235600]); assert.commandFailedWithCode( st.s.adminCommand( {shardCollection: 'test.ts', key: {time: 1}, timeseries: {timeField: 'time'}}), - 6159000); + [6159000, 6235600]); // Cannot shard a system namespace. assert.commandFailedWithCode(st.s.adminCommand({ diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 915db853b79..d45344827f5 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -488,8 +488,9 @@ error_codes: - {code: 376, name: ChangeStreamNotEnabled} - {code: 377, name: FLEMaxTagLimitExceeded } - {code: 378, name: NonConformantBSON, categories: [ValidationError]} - - {code: 379, name: DatabaseMetadataRefreshCanceled, categories: [InternalOnly]} + - {code: 380, name: RequestAlreadyFulfilled} + # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index b312b54c8bb..e49fc035752 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -28,6 +28,7 @@ */ +#include "mongo/db/s/create_collection_coordinator_document_gen.h" #include "mongo/platform/basic.h" #include "mongo/db/audit.h" @@ -52,6 +53,7 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/timeseries/catalog_helper.h" #include "mongo/db/timeseries/timeseries_constants.h" +#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/cluster_commands_helpers.h" @@ -99,6 +101,64 @@ OptionsAndIndexes getCollectionOptionsAndIndexes(OperationContext* opCtx, idIndex}; } +// NOTES on the 'collation' optional parameter contained by the shardCollection() request: +// 1. It specifies the ordering criteria that will be applied when comparing chunk boundaries +// during sharding operations (such as move/mergeChunks). +// 2. As per today, the only supported value (and the one applied by default) is 'simple' +// collation. +// 3. If the collection being sharded does not exist yet, it will also be used as the ordering +// criteria to serve user queries over the shard index fields. +// 4. If an existing unsharded collection is being targeted, the original 'collation' will still +// be used to serve user queries, but the shardCollection is required to explicitly include the +// 'collation' parameter to succeed (as an acknowledge of what specified in points 1. and 2.) +BSONObj resolveCollationForUserQueries(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<BSONObj>& collationInRequest) { + // Ensure the collation is valid. Currently we only allow the simple collation. + std::unique_ptr<CollatorInterface> requestedCollator = nullptr; + if (collationInRequest) { + requestedCollator = + uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(collationInRequest.value())); + uassert(ErrorCodes::BadValue, + str::stream() << "The collation for shardCollection must be {locale: 'simple'}, " + << "but found: " << collationInRequest.value(), + !requestedCollator); + } + + AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden); + + const auto actualCollator = [&]() -> const CollatorInterface* { + const auto& coll = autoColl.getCollection(); + if (coll) { + uassert( + ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped()); + return coll->getDefaultCollator(); + } + + return nullptr; + }(); + + if (!requestedCollator && !actualCollator) + return BSONObj(); + + auto actualCollation = actualCollator->getSpec(); + auto actualCollatorBSON = actualCollation.toBSON(); + + if (!collationInRequest) { + auto actualCollatorFilter = + uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(actualCollatorBSON)); + uassert(ErrorCodes::BadValue, + str::stream() << "If no collation was specified, the collection collation must be " + "{locale: 'simple'}, " + << "but found: " << actualCollatorBSON, + !actualCollatorFilter); + } + + return actualCollatorBSON; +} + /** * Constructs the BSON specification document for the create collections command using the given * namespace, collation, and timeseries options. @@ -210,55 +270,6 @@ int getNumShards(OperationContext* opCtx) { return shardRegistry->getNumShards(opCtx); } -std::pair<boost::optional<Collation>, BSONObj> getCollation( - OperationContext* opCtx, - const NamespaceString& nss, - const boost::optional<BSONObj>& collation) { - // Ensure the collation is valid. Currently we only allow the simple collation. - std::unique_ptr<CollatorInterface> requestedCollator = nullptr; - if (collation) { - requestedCollator = - uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(collation.value())); - uassert(ErrorCodes::BadValue, - str::stream() << "The collation for shardCollection must be {locale: 'simple'}, " - << "but found: " << collation.value(), - !requestedCollator); - } - - AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden); - - const auto actualCollator = [&]() -> const CollatorInterface* { - const auto& coll = autoColl.getCollection(); - if (coll) { - uassert( - ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped()); - return coll->getDefaultCollator(); - } - - return nullptr; - }(); - - if (!requestedCollator && !actualCollator) - return {boost::none, BSONObj()}; - - auto actualCollation = actualCollator->getSpec(); - auto actualCollatorBSON = actualCollation.toBSON(); - - if (!collation) { - auto actualCollatorFilter = - uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(actualCollatorBSON)); - uassert(ErrorCodes::BadValue, - str::stream() << "If no collation was specified, the collection collation must be " - "{locale: 'simple'}, " - << "but found: " << actualCollatorBSON, - !actualCollatorFilter); - } - - return {actualCollation, actualCollatorBSON}; -} - void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx, const UUID& uuid, const OperationSessionInfo& osi) { @@ -363,6 +374,14 @@ void CreateCollectionCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuild cmdInfoBuilder->appendElements(_request.toBSON()); } +const NamespaceString& CreateCollectionCoordinator::nss() const { + // Rely on the resolved request parameters to retrieve the nss to be targeted by the + // coordinator. + stdx::lock_guard lk{_docMutex}; + invariant(_doc.getTranslatedRequestParams()); + return _doc.getTranslatedRequestParams()->getNss(); +} + void CreateCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { // If we have two shard collections on the same namespace, then the arguments must be the same. const auto otherDoc = CreateCollectionCoordinatorDocument::parse( @@ -380,15 +399,38 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( const CancellationToken& token) noexcept { return ExecutorFuture<void>(**executor) .then([this, anchor = shared_from_this()] { - _shardKeyPattern = ShardKeyPattern(*_request.getShardKey()); - if (_doc.getPhase() < Phase::kCommit) { + if (_doc.getPhase() < Phase::kTranslateRequest) { auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); _checkCommandArguments(opCtx); + // Perform a preliminary check on whether the request may resolve into a no-op + // before acquiring any critical section. + auto createCollectionResponseOpt = + _checkIfCollectionAlreadyShardedWithSameOptions(opCtx); + if (createCollectionResponseOpt) { + _result = createCollectionResponseOpt; + // Launch an exception to directly jump to the end of the continuation chain + uasserted(ErrorCodes::RequestAlreadyFulfilled, + str::stream() << "The collection" << originalNss() + << "was already sharded by a past request"); + } } }) + .then(_executePhase(Phase::kTranslateRequest, + [this, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + _logStartCreateCollection(opCtx); + + // Enter the critical sections before patching the user request to + // avoid data races with concurrenct creation of unsharded + // collections referencing the same namespace(s). + _acquireCriticalSections(opCtx); + _doc.setTranslatedRequestParams(_translateRequestParameters(opCtx)); + })) .then(_executePhase( Phase::kCommit, [this, executor = executor, token, anchor = shared_from_this()] { @@ -409,38 +451,23 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( opCtx, getCurrentSession(), **executor); } - // Log the start of the event only if we're not recovering. - _logStartCreateCollection(opCtx); - - _checkCollectionUUIDMismatch(opCtx); - - // Quick check (without critical section) to see if another create collection - // already succeeded. + // Check if the collection was already sharded by a past request if (auto createCollectionResponseOpt = sharding_ddl_util::checkIfCollectionAlreadySharded( opCtx, nss(), - _shardKeyPattern->getKeyPattern().toBSON(), - getCollation(opCtx, nss(), _request.getCollation()).second, + _doc.getTranslatedRequestParams()->getKeyPattern().toBSON(), + _doc.getTranslatedRequestParams()->getCollation(), _request.getUnique().value_or(false))) { - // The critical section can still be held here if the node committed the - // sharding of the collection but then it stepped down before it managed to - // delete the coordinator document - ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + // A previous request already created and committed the collection but there was + // a stepdown after the commit. + _releaseCriticalSections(opCtx); _result = createCollectionResponseOpt; return; } - // Entering the critical section. From this point on, the writes are blocked. Before - // calling this method, we need the coordinator document to be persisted (and hence - // the kCheck state), otherwise nothing will release the critical section in the - // presence of a stepdown. - ShardingRecoveryService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); - if (!_firstExecution) { auto uuid = sharding_ddl_util::getCollectionUUID(opCtx, nss()); // If the collection can be found locally, then we clean up the config.chunks @@ -459,26 +486,22 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( } } - _createPolicy(opCtx); - _createCollectionAndIndexes(opCtx); + ShardKeyPattern shardKeyPattern(_doc.getTranslatedRequestParams()->getKeyPattern()); + _createPolicy(opCtx, shardKeyPattern); + _createCollectionAndIndexes(opCtx, shardKeyPattern); audit::logShardCollection(opCtx->getClient(), - nss().ns(), + nss().toString(), *_request.getShardKey(), _request.getUnique().value_or(false)); if (_splitPolicy->isOptimized()) { - _createChunks(opCtx); + _createChunks(opCtx, shardKeyPattern); // Block reads/writes from here on if we need to create the collection on other // shards, this way we prevent reads/writes that should be redirected to another // shard - ShardingRecoveryService::get(opCtx) - ->promoteRecoverableCriticalSectionToBlockAlsoReads( - opCtx, - nss(), - _critSecReason, - ShardingCatalogClient::kMajorityWriteConcern); + _promoteCriticalSectionsToBlockReads(opCtx); _updateSession(opCtx); _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession()); @@ -487,14 +510,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( } // End of the critical section, from now on, read and writes are permitted. - ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + _releaseCriticalSections(opCtx); // Slow path. Create chunks (which might incur in an index scan) and commit must be // done outside of the critical section to prevent writes from stalling in unsharded // collections. if (!_splitPolicy->isOptimized()) { - _createChunks(opCtx); + _createChunks(opCtx, shardKeyPattern); _commit(opCtx); } })) @@ -505,64 +527,181 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( _logEndCreateCollection(opCtx); }) .onError([this, anchor = shared_from_this()](const Status& status) { + if (status == ErrorCodes::RequestAlreadyFulfilled) { + return Status::OK(); + } + if (!status.isA<ErrorCategory::NotPrimaryError>() && !status.isA<ErrorCategory::ShutdownError>()) { LOGV2_ERROR(5458702, "Error running create collection", - "namespace"_attr = nss(), + "namespace"_attr = originalNss(), "error"_attr = redact(status)); auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); - ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + _releaseCriticalSections(opCtx); } + return status; }); } -void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) { - LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = nss()); +boost::optional<CreateCollectionResponse> +CreateCollectionCoordinator::_checkIfCollectionAlreadyShardedWithSameOptions( + OperationContext* opCtx) { + // Perfom check in the translation phase if the request is coming from a C2C command; this will + // allow to honor the contract with mongosync (see SERVER-67885 for details) + if (_request.getCollectionUUID()) { + return boost::none; + } - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Namespace too long. Namespace: " << nss() - << " Max: " << NamespaceString::MaxNsShardedCollectionLen, - nss().size() <= NamespaceString::MaxNsShardedCollectionLen); + // Preliminary check is unsupported for DDL requests received by nodes running old FCVs. + if (_timeseriesNssResolvedByCommandHandler()) { + return boost::none; + } + + // Check is there is a standard sharded collection that matches the original request parameters + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh( + opCtx, originalNss())); + if (routingInfo.isSharded()) { + auto requestMatchesExistingCollection = [&] { + // No timeseries fields in request + if (_request.getTimeseries()) { + return false; + } + + if (_request.getUnique().value_or(false) != routingInfo.isUnique()) { + return false; + } + + if (SimpleBSONObjComparator::kInstance.evaluate( + *_request.getShardKey() != routingInfo.getShardKeyPattern().toBSON())) { + return false; + } + + auto defaultCollator = routingInfo.getDefaultCollator() + ? routingInfo.getDefaultCollator()->getSpec().toBSON() + : BSONObj(); + if (SimpleBSONObjComparator::kInstance.evaluate( + defaultCollator != + resolveCollationForUserQueries( + opCtx, originalNss(), _request.getCollation()))) { + return false; + } + + return true; + }(); + + uassert(ErrorCodes::AlreadyInitialized, + str::stream() << "sharding already enabled for collection " << originalNss(), + requestMatchesExistingCollection); + + CreateCollectionResponse response( + {routingInfo.getVersion(), CollectionIndexes(routingInfo.getVersion(), boost::none)}); + response.setCollectionUUID(routingInfo.getUUID()); + return response; + } - if (nss().db() == NamespaceString::kConfigDb) { + // If the request is still unresolved, check if there is an existing TS buckets namespace that + // may be matched by the request. + auto bucketsNss = originalNss().makeTimeseriesBucketsNamespace(); + routingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, bucketsNss)); + if (!routingInfo.isSharded()) { + return boost::none; + } + + auto requestMatchesExistingCollection = [&] { + if (routingInfo.isUnique() != _request.getUnique().value_or(false)) { + return false; + } + + // Timeseries options match + const auto& timeseriesOptionsOnDisk = + (*routingInfo.getTimeseriesFields()).getTimeseriesOptions(); + if (_request.getTimeseries() && + !timeseries::optionsAreEqual(*_request.getTimeseries(), timeseriesOptionsOnDisk)) { + return false; + } + + auto defaultCollator = routingInfo.getDefaultCollator() + ? routingInfo.getDefaultCollator()->getSpec().toBSON() + : BSONObj(); + if (SimpleBSONObjComparator::kInstance.evaluate( + defaultCollator != + resolveCollationForUserQueries(opCtx, bucketsNss, _request.getCollation()))) { + return false; + } + + // Same Key Pattern + const auto& timeseriesOptions = + _request.getTimeseries() ? *_request.getTimeseries() : timeseriesOptionsOnDisk; + auto requestKeyPattern = + uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec( + timeseriesOptions, *_request.getShardKey())); + if (SimpleBSONObjComparator::kInstance.evaluate(routingInfo.getShardKeyPattern().toBSON() != + requestKeyPattern)) { + return false; + } + return true; + }(); + + uassert(ErrorCodes::AlreadyInitialized, + str::stream() << "sharding already enabled for collection " << bucketsNss, + requestMatchesExistingCollection); + + CreateCollectionResponse response( + {routingInfo.getVersion(), CollectionIndexes(routingInfo.getVersion(), boost::none)}); + response.setCollectionUUID(routingInfo.getUUID()); + return response; +} + +void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) { + LOGV2_DEBUG( + 5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = originalNss()); + + if (originalNss().db() == NamespaceString::kConfigDb) { // Only allowlisted collections in config may be sharded (unless we are in test mode) uassert(ErrorCodes::IllegalOperation, "only special collections in the config db may be sharded", - nss() == NamespaceString::kLogicalSessionsNamespace); + originalNss() == NamespaceString::kLogicalSessionsNamespace); } // Ensure that hashed and unique are not both set. uassert(ErrorCodes::InvalidOptions, "Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on " "the hashed field by declaring an additional (non-hashed) unique index on the field.", - !_shardKeyPattern->isHashedPattern() || !_request.getUnique().value_or(false)); - - // Ensure that a time-series collection cannot be sharded unless the feature flag is enabled. - if (nss().isTimeseriesBucketsCollection()) { - uassert(ErrorCodes::IllegalOperation, - str::stream() << "can't shard time-series collection " << nss(), - feature_flags::gFeatureFlagShardedTimeSeries.isEnabled( - serverGlobalParams.featureCompatibility) || - !timeseries::getTimeseriesOptions(opCtx, nss(), false)); + !ShardKeyPattern(*_request.getShardKey()).isHashedPattern() || + !_request.getUnique().value_or(false)); + + if (_timeseriesNssResolvedByCommandHandler()) { + // Ensure that a time-series collection cannot be sharded unless the feature flag is + // enabled. + if (originalNss().isTimeseriesBucketsCollection()) { + uassert(ErrorCodes::IllegalOperation, + str::stream() << "can't shard time-series collection " << nss(), + feature_flags::gFeatureFlagShardedTimeSeries.isEnabled( + serverGlobalParams.featureCompatibility) || + !timeseries::getTimeseriesOptions(opCtx, nss(), false)); + } } // Ensure the namespace is valid. uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", - !nss().isSystem() || nss() == NamespaceString::kLogicalSessionsNamespace || - nss().isTemporaryReshardingCollection() || nss().isTimeseriesBucketsCollection()); + !originalNss().isSystem() || + originalNss() == NamespaceString::kLogicalSessionsNamespace || + originalNss().isTemporaryReshardingCollection() || + originalNss().isTimeseriesBucketsCollection()); if (_request.getNumInitialChunks()) { // Ensure numInitialChunks is within valid bounds. // Cannot have more than kMaxSplitPoints initial chunks per shard. Setting a maximum of - // 1,000,000 chunks in total to limit the amount of memory this command consumes so there is - // less danger of an OOM error. + // 1,000,000 chunks in total to limit the amount of memory this command consumes so + // there is less danger of an OOM error. const int maxNumInitialChunksForShards = Grid::get(opCtx)->shardRegistry()->getNumShards(opCtx) * shardutil::kMaxSplitPoints; @@ -576,14 +715,14 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx numChunks <= maxNumInitialChunksTotal); } - if (nss().db() == NamespaceString::kConfigDb) { + if (originalNss().db() == NamespaceString::kConfigDb) { auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto findReponse = uassertStatusOK( configShard->exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kMajorityReadConcern, - nss(), + originalNss(), BSONObj(), BSONObj(), 1)); @@ -597,22 +736,212 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx } } -void CreateCollectionCoordinator::_checkCollectionUUIDMismatch(OperationContext* opCtx) const { - AutoGetCollection coll{opCtx, nss(), MODE_IS}; - checkCollectionUUIDMismatch(opCtx, nss(), coll.getCollection(), _request.getCollectionUUID()); +TranslatedRequestParams CreateCollectionCoordinator::_translateRequestParameters( + OperationContext* opCtx) { + auto performCheckOnCollectionUUID = [this, opCtx](const NamespaceString& resolvedNss) { + AutoGetCollection coll{opCtx, resolvedNss, MODE_IS}; + checkCollectionUUIDMismatch( + opCtx, resolvedNss, coll.getCollection(), _request.getCollectionUUID()); + }; + + auto bucketsNs = originalNss().makeTimeseriesBucketsNamespace(); + auto existingBucketsColl = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs); + + auto targetingStandardCollection = !_request.getTimeseries() && !existingBucketsColl; + + if (_timeseriesNssResolvedByCommandHandler() || targetingStandardCollection) { + const auto& resolvedNamespace = originalNss(); + performCheckOnCollectionUUID(resolvedNamespace); + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Namespace too long. Namespace: " << resolvedNamespace + << " Max: " << NamespaceString::MaxNsShardedCollectionLen, + resolvedNamespace.size() <= NamespaceString::MaxNsShardedCollectionLen); + return TranslatedRequestParams( + resolvedNamespace, + *_request.getShardKey(), + resolveCollationForUserQueries(opCtx, resolvedNamespace, _request.getCollation())); + } + + // The request is targeting a new or existing Timeseries collection and the request has not been + // patched yet. + const auto& resolvedNamespace = bucketsNs; + performCheckOnCollectionUUID(resolvedNamespace); + uassert(ErrorCodes::IllegalOperation, + "Sharding a timeseries collection feature is not enabled", + feature_flags::gFeatureFlagShardedTimeSeries.isEnabled( + serverGlobalParams.featureCompatibility)); + + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Namespace too long. Namespace: " << resolvedNamespace + << " Max: " << NamespaceString::MaxNsShardedCollectionLen, + resolvedNamespace.size() <= NamespaceString::MaxNsShardedCollectionLen); + + // Consolidate the related request parameters... + auto existingTimeseriesOptions = [&bucketsNs, &existingBucketsColl] { + if (!existingBucketsColl) { + return boost::optional<TimeseriesOptions>(); + } + + uassert(6159000, + str::stream() << "the collection '" << bucketsNs + << "' does not have 'timeseries' options", + existingBucketsColl->getTimeseriesOptions()); + return existingBucketsColl->getTimeseriesOptions(); + }(); + + if (_request.getTimeseries() && existingTimeseriesOptions) { + uassert(5731500, + str::stream() << "the 'timeseries' spec provided must match that of exists '" + << originalNss() << "' collection", + timeseries::optionsAreEqual(*_request.getTimeseries(), *existingTimeseriesOptions)); + } else if (!_request.getTimeseries()) { + _request.setTimeseries(existingTimeseriesOptions); + } + + // check that they are consistent with the requested shard key before creating the key pattern + // object. + auto timeFieldName = _request.getTimeseries()->getTimeField(); + auto metaFieldName = _request.getTimeseries()->getMetaField(); + BSONObjIterator shardKeyElems{*_request.getShardKey()}; + while (auto elem = shardKeyElems.next()) { + if (elem.fieldNameStringData() == timeFieldName) { + uassert(5914000, + str::stream() << "the time field '" << timeFieldName + << "' can be only at the end of the shard key pattern", + !shardKeyElems.more()); + } else { + uassert(5914001, + str::stream() << "only the time field or meta field can be " + "part of shard key pattern", + metaFieldName && + (elem.fieldNameStringData() == *metaFieldName || + elem.fieldNameStringData().startsWith(*metaFieldName + "."))); + } + } + KeyPattern keyPattern( + uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec( + *_request.getTimeseries(), *_request.getShardKey()))); + return TranslatedRequestParams( + resolvedNamespace, + keyPattern, + resolveCollationForUserQueries(opCtx, resolvedNamespace, _request.getCollation())); +} + +bool CreateCollectionCoordinator::_timeseriesNssResolvedByCommandHandler() const { + return operationType() == DDLCoordinatorTypeEnum::kCreateCollectionPre61Compatible; } -void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* opCtx) { +void CreateCollectionCoordinator::_acquireCriticalSections(OperationContext* opCtx) { + // TODO SERVER-68084 call ShardingRecoveryService without the try/catch block + try { + ShardingRecoveryService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( + opCtx, + originalNss(), + _critSecReason, + ShardingCatalogClient::kMajorityWriteConcern, + boost::none); + } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { + if (_timeseriesNssResolvedByCommandHandler()) { + throw; + } + + // In case we acquisition was rejected because it targets an existing view, the critical + // section is not needed and the error can be dropped because: + // 1. We will not shard the view namespace + // 2. This collection will remain a view since we are holding the DDL coll lock and + // thus the collection can't be dropped. + _doc.setDisregardCriticalSectionOnOriginalNss(true); + } + + if (!_timeseriesNssResolvedByCommandHandler()) { + // Preventively acquire the critical section protecting the buckets namespace that the + // creation of a timeseries collection would require. + const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace(); + ShardingRecoveryService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( + opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + } +} + +void CreateCollectionCoordinator::_promoteCriticalSectionsToBlockReads( + OperationContext* opCtx) const { + // TODO SERVER-68084 call ShardingRecoveryService without the if blocks. + if (!_doc.getDisregardCriticalSectionOnOriginalNss()) { + ShardingRecoveryService::get(opCtx)->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + } + + if (!_timeseriesNssResolvedByCommandHandler()) { + const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace(); + ShardingRecoveryService::get(opCtx)->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + } +} + +void CreateCollectionCoordinator::_releaseCriticalSections(OperationContext* opCtx) { + // TODO SERVER-68084 call ShardingRecoveryService without the try/catch block. + try { + ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection( + opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + } catch (ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { + // Ignore the error (when it is raised, we can assume that no critical section for the view + // was previously acquired). + } + + if (!_timeseriesNssResolvedByCommandHandler()) { + const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace(); + ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection( + opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + } +} + +void CreateCollectionCoordinator::_createCollectionAndIndexes( + OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern) { LOGV2_DEBUG( 5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = nss()); + const auto& collationBSON = _doc.getTranslatedRequestParams()->getCollation(); boost::optional<Collation> collation; - std::tie(collation, _collationBSON) = getCollation(opCtx, nss(), _request.getCollation()); + if (!collationBSON.isEmpty()) { + collation.emplace( + Collation::parse(IDLParserContext("CreateCollectionCoordinator"), collationBSON)); + } // We need to implicitly create a timeseries view and underlying bucket collection. if (_collectionEmpty && _request.getTimeseries()) { + // TODO SERVER-68084 Remove viewLock and the whole if section that constructs it while + // releasing the critical section on the originalNss. + boost::optional<AutoGetCollection> viewLock; + if (auto criticalSectionAcquiredOnOriginalNss = + !_doc.getDisregardCriticalSectionOnOriginalNss(); + !_timeseriesNssResolvedByCommandHandler() && criticalSectionAcquiredOnOriginalNss) { + // This is the subcase of a not yet existing pair of view (originalNss)+ bucket (nss) + // timeseries collection that the DDL will have to create. Due to the current + // constraints of the code: + // - Such creation cannot be performed while holding the critical section over the views + // namespace (once the view gets created, the CS will not be releasable); instead, + // exclusive access must be enforced through a collection lock + // - The critical section cannot be released while holding a collection lock, so this + // operation must be performed first (leaving a small window open to data races) + ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection( + opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + _doc.setDisregardCriticalSectionOnOriginalNss(true); + viewLock.emplace( + opCtx, originalNss(), LockMode::MODE_X, AutoGetCollectionViewMode::kViewsPermitted); + // Once the exclusive access has been reacquired, ensure that no data race occurred. + auto catalog = CollectionCatalog::get(opCtx); + if (catalog->lookupView(opCtx, originalNss()) || + catalog->lookupCollectionByNamespace(opCtx, originalNss())) { + _completeOnError = true; + uasserted(ErrorCodes::NamespaceExists, + str::stream() << "A conflicting DDL operation was completed while trying " + "to shard collection: " + << originalNss()); + } + } + const auto viewName = nss().getTimeseriesViewNamespace(); - auto createCmd = makeCreateCommand(viewName, collation, _request.getTimeseries().value()); + auto createCmd = makeCreateCommand(viewName, collation, *_request.getTimeseries()); BSONObj createRes; DBDirectClient localClient(opCtx); @@ -629,15 +958,15 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* } } - shardkeyutil::validateShardKeyIsNotEncrypted(opCtx, nss(), *_shardKeyPattern); + shardkeyutil::validateShardKeyIsNotEncrypted(opCtx, nss(), shardKeyPattern); auto indexCreated = false; if (_request.getImplicitlyCreateIndex().value_or(true)) { indexCreated = shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible( opCtx, nss(), - *_shardKeyPattern, - _collationBSON, + shardKeyPattern, + collationBSON, _request.getUnique().value_or(false), _request.getEnforceUniquenessCheck().value_or(true), shardkeyutil::ValidationBehaviorsShardCollection(opCtx)); @@ -646,8 +975,8 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* "Must have an index compatible with the proposed shard key", validShardKeyIndexExists(opCtx, nss(), - *_shardKeyPattern, - _collationBSON, + shardKeyPattern, + collationBSON, _request.getUnique().value_or(false) && _request.getEnforceUniquenessCheck().value_or(true), shardkeyutil::ValidationBehaviorsShardCollection(opCtx))); @@ -658,8 +987,8 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* if (!indexCreated) { replClientInfo.setLastOpToSystemLastOpTime(opCtx); } - // Wait until the index is majority written, to prevent having the collection commited to the - // config server, but the index creation rolled backed on stepdowns. + // Wait until the index is majority written, to prevent having the collection commited to + // the config server, but the index creation rolled backed on stepdowns. WriteConcernResult ignoreResult; uassertStatusOK(waitForWriteConcern(opCtx, replClientInfo.getLastOp(), @@ -669,28 +998,28 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* _collectionUUID = *sharding_ddl_util::getCollectionUUID(opCtx, nss()); } -void CreateCollectionCoordinator::_createPolicy(OperationContext* opCtx) { +void CreateCollectionCoordinator::_createPolicy(OperationContext* opCtx, + const ShardKeyPattern& shardKeyPattern) { LOGV2_DEBUG(6042001, 2, "Create collection _createPolicy", "namespace"_attr = nss()); - _collectionEmpty = checkIfCollectionIsEmpty(opCtx, nss()); _splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy( opCtx, - *_shardKeyPattern, + shardKeyPattern, _request.getNumInitialChunks() ? *_request.getNumInitialChunks() : 0, _request.getPresplitHashedZones() ? *_request.getPresplitHashedZones() : false, _request.getInitialSplitPoints(), - getTagsAndValidate(opCtx, nss(), _shardKeyPattern->toBSON()), + getTagsAndValidate(opCtx, nss(), shardKeyPattern.toBSON()), getNumShards(opCtx), *_collectionEmpty, !feature_flags::gNoMoreAutoSplitter.isEnabled(serverGlobalParams.featureCompatibility)); } -void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) { +void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx, + const ShardKeyPattern& shardKeyPattern) { LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = nss()); - _initialChunks = _splitPolicy->createFirstChunks( - opCtx, *_shardKeyPattern, {*_collectionUUID, ShardingState::get(opCtx)->shardId()}); + opCtx, shardKeyPattern, {*_collectionUUID, ShardingState::get(opCtx)->shardId()}); // There must be at least one chunk. invariant(_initialChunks); @@ -771,7 +1100,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { _initialChunks->collVersion().getTimestamp(), Date_t::now(), *_collectionUUID, - _shardKeyPattern->getKeyPattern()); + _doc.getTranslatedRequestParams()->getKeyPattern()); if (_request.getTimeseries()) { TypeCollectionTimeseriesFields timeseriesFields; @@ -779,8 +1108,9 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { coll.setTimeseriesFields(std::move(timeseriesFields)); } - if (_collationBSON) { - coll.setDefaultCollation(_collationBSON.value()); + if (auto collationBSON = _doc.getTranslatedRequestParams()->getCollation(); + !collationBSON.isEmpty()) { + coll.setDefaultCollation(collationBSON); } if (_request.getUnique()) { @@ -802,8 +1132,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { "namespace"_attr = nss(), "error"_attr = redact(ex)); - // If the refresh fails, then set the shard version to UNKNOWN and let a future operation to - // refresh the metadata. + // If the refresh fails, then set the shard version to UNKNOWN and let a future + // operation to refresh the metadata. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, nss(), MODE_IX); CollectionShardingRuntime::get(opCtx, nss())->clearFilteringMetadata(opCtx); @@ -811,8 +1141,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { throw; } - // Best effort refresh to warm up cache of all involved shards so we can have a cluster ready to - // receive operations. + // Best effort refresh to warm up cache of all involved shards so we can have a cluster + // ready to receive operations. auto shardRegistry = Grid::get(opCtx)->shardRegistry(); auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId(); @@ -856,10 +1186,10 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { void CreateCollectionCoordinator::_logStartCreateCollection(OperationContext* opCtx) { BSONObjBuilder collectionDetail; collectionDetail.append("shardKey", *_request.getShardKey()); - collectionDetail.append("collection", nss().ns()); + collectionDetail.append("collection", originalNss().ns()); collectionDetail.append("primary", ShardingState::get(opCtx)->shardId().toString()); ShardingLogging::get(opCtx)->logChange( - opCtx, "shardCollection.start", nss().ns(), collectionDetail.obj()); + opCtx, "shardCollection.start", originalNss().ns(), collectionDetail.obj()); } void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCtx) { @@ -872,7 +1202,7 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt collectionDetail.appendNumber("numChunks", static_cast<long long>(_initialChunks->chunks.size())); ShardingLogging::get(opCtx)->logChange( - opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj()); + opCtx, "shardCollection.end", originalNss().ns(), collectionDetail.obj()); } } // namespace mongo diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h index a1f8bbea4e8..86aee402c3f 100644 --- a/src/mongo/db/s/create_collection_coordinator.h +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -51,7 +51,7 @@ public: _request(_doc.getCreateCollectionRequest()), _critSecReason(BSON("command" << "createCollection" - << "ns" << nss().toString())) {} + << "ns" << originalNss().toString())) {} ~CreateCollectionCoordinator() = default; @@ -71,7 +71,7 @@ public: } protected: - const mongo::CreateCollectionRequest _request; + const NamespaceString& nss() const override; private: StringData serializePhase(const Phase& phase) const override { @@ -86,25 +86,36 @@ private: */ void _checkCommandArguments(OperationContext* opCtx); - /** - * Checks that the collection has UUID matching the collectionUUID parameter, if provided. - */ - void _checkCollectionUUIDMismatch(OperationContext* opCtx) const; + boost::optional<CreateCollectionResponse> _checkIfCollectionAlreadyShardedWithSameOptions( + OperationContext* opCtx); + + TranslatedRequestParams _translateRequestParameters(OperationContext* opCtx); + + // TODO SERVER-68008 Remove once 7.0 becomes last LTS; when the function appears in if clauses, + // modify the code assuming that a "false" value gets returned + bool _timeseriesNssResolvedByCommandHandler() const; + + void _acquireCriticalSections(OperationContext* opCtx); + + void _promoteCriticalSectionsToBlockReads(OperationContext* opCtx) const; + + void _releaseCriticalSections(OperationContext* opCtx); /** * Ensures the collection is created locally and has the appropiate shard index. */ - void _createCollectionAndIndexes(OperationContext* opCtx); + void _createCollectionAndIndexes(OperationContext* opCtx, + const ShardKeyPattern& shardKeyPattern); /** * Creates the appropiate split policy. */ - void _createPolicy(OperationContext* opCtx); + void _createPolicy(OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern); /** * Given the appropiate split policy, create the initial chunks. */ - void _createChunks(OperationContext* opCtx); + void _createChunks(OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern); /** * If the optimized path can be taken, ensure the collection is already created in all the @@ -130,11 +141,9 @@ private: */ void _logEndCreateCollection(OperationContext* opCtx); - const BSONObj _critSecReason; + mongo::CreateCollectionRequest _request; - // The shard key of the collection, static for the duration of the coordinator and reflects the - // original command - boost::optional<ShardKeyPattern> _shardKeyPattern; + const BSONObj _critSecReason; // Set on successful completion of the coordinator boost::optional<CreateCollectionResponse> _result; @@ -142,7 +151,6 @@ private: // The fields below are only populated if the coordinator enters in the branch where the // collection is not already sharded (i.e., they will not be present on early return) - boost::optional<BSONObj> _collationBSON; boost::optional<UUID> _collectionUUID; std::unique_ptr<InitialSplitPolicy> _splitPolicy; diff --git a/src/mongo/db/s/create_collection_coordinator_document.idl b/src/mongo/db/s/create_collection_coordinator_document.idl index 8e8cbe80c07..925ae0039a0 100644 --- a/src/mongo/db/s/create_collection_coordinator_document.idl +++ b/src/mongo/db/s/create_collection_coordinator_document.idl @@ -43,9 +43,28 @@ enums: type: string values: kUnset: "unset" + kTranslateRequest: "translateRequest" kCommit: "commit" structs: + TranslatedRequestParams: + description: "Subset of CreateCollectionRequest fields that may get overridden once the kTranslateRequest phase gets executed" + generate_comparison_operators: false + strict: false + fields: + nss: + type: namespacestring + description: "The namespace that will be used by the DDL while accessing the Sharding Catalog" + optional: false + keyPattern: + type: KeyPattern + description: "The shard key that will be used by the DDL while accessing the Sharding Catalog" + optional: false + collation: + type: object_owned + description: "Collation value from the targeted unsharded collection if this exists; otherwise, simple-collation" + optional: false + CreateCollectionCoordinatorDocument: description: "Object with neccessary fields to create a collection" generate_comparison_operators: false @@ -57,4 +76,12 @@ structs: phase: type: CreateCollectionCoordinatorPhase description: "Coordinator phase." - default: kUnset
\ No newline at end of file + default: kUnset + translatedRequestParams: + type: TranslatedRequestParams + description: "The field is populated only once the kTranslateRequest phase is completed" + optional: true + # TODO SERVER-68084 remove the following field + disregardCriticalSectionOnOriginalNss: + type: optionalBool + description: "When set to true, the DDL operation is being performed without acquiring a critical section over the NSS specified by the user." diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index d8eaad0139d..d8f42fc6ea5 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -87,17 +87,6 @@ public: return _completionPromise.getFuture(); } - const NamespaceString& originalNss() const { - return _coordId.getNss(); - } - - const NamespaceString& nss() const { - if (const auto& bucketNss = metadata().getBucketNss()) { - return bucketNss.get(); - } - return originalNss(); - } - DDLCoordinatorTypeEnum operationType() const { return _coordId.getOperationType(); } @@ -112,6 +101,17 @@ public: } protected: + const NamespaceString& originalNss() const { + return _coordId.getNss(); + } + + virtual const NamespaceString& nss() const { + if (const auto& bucketNss = metadata().getBucketNss()) { + return bucketNss.get(); + } + return originalNss(); + } + virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) { return {}; }; diff --git a/src/mongo/db/s/sharding_ddl_coordinator.idl b/src/mongo/db/s/sharding_ddl_coordinator.idl index e38a0f63334..9398ef118fa 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.idl +++ b/src/mongo/db/s/sharding_ddl_coordinator.idl @@ -47,7 +47,9 @@ enums: kDropDatabase: "dropDatabase" kDropCollection: "dropCollection" kRenameCollection: "renameCollection" - kCreateCollection: "createCollection_V2" + kCreateCollection: "createCollection_V3" + # TODO SERVER-68008: Remove once 7.0 becomes last LTS + kCreateCollectionPre61Compatible: "createCollection_V2" kRefineCollectionShardKey: "refineCollectionShardKey" kSetAllowMigrations: "setAllowMigrations" kCollMod: "collMod_V3" diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp index 1fc117046d2..ca445697a3e 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp @@ -77,6 +77,8 @@ std::shared_ptr<ShardingDDLCoordinator> constructShardingDDLCoordinatorInstance( case DDLCoordinatorTypeEnum::kRenameCollection: return std::make_shared<RenameCollectionCoordinator>(service, std::move(initialState)); case DDLCoordinatorTypeEnum::kCreateCollection: + // TODO SERVER-68008 Remove the Pre61Compatible case once 7.0 becomes last LTS + case DDLCoordinatorTypeEnum::kCreateCollectionPre61Compatible: return std::make_shared<CreateCollectionCoordinator>(service, std::move(initialState)); break; case DDLCoordinatorTypeEnum::kRefineCollectionShardKey: diff --git a/src/mongo/db/s/sharding_recovery_service.cpp b/src/mongo/db/s/sharding_recovery_service.cpp index be00f0fefc4..7f895570bae 100644 --- a/src/mongo/db/s/sharding_recovery_service.cpp +++ b/src/mongo/db/s/sharding_recovery_service.cpp @@ -97,6 +97,8 @@ void ShardingRecoveryService::acquireRecoverableCriticalSectionBlockWrites( { Lock::GlobalLock lk(opCtx, MODE_IX); + // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to + // construct cCollLock. AutoGetCollection cCollLock(opCtx, nss, MODE_S); DBDirectClient dbClient(opCtx); @@ -183,6 +185,8 @@ void ShardingRecoveryService::promoteRecoverableCriticalSectionToBlockAlsoReads( invariant(!opCtx->lockState()->isLocked()); { + // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to + // construct cCollLock. AutoGetCollection cCollLock(opCtx, nss, MODE_X); DBDirectClient dbClient(opCtx); @@ -283,6 +287,8 @@ void ShardingRecoveryService::releaseRecoverableCriticalSection( invariant(!opCtx->lockState()->isLocked()); { + // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to + // construct cCollLock. AutoGetCollection collLock(opCtx, nss, MODE_X); DBDirectClient dbClient(opCtx); diff --git a/src/mongo/db/s/shardsvr_create_collection_command.cpp b/src/mongo/db/s/shardsvr_create_collection_command.cpp index c5d40d29bb5..3155963cc52 100644 --- a/src/mongo/db/s/shardsvr_create_collection_command.cpp +++ b/src/mongo/db/s/shardsvr_create_collection_command.cpp @@ -32,12 +32,15 @@ #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/commands.h" #include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/s/create_collection_coordinator.h" #include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" #include "mongo/db/timeseries/timeseries_options.h" #include "mongo/logv2/log.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" +#include "mongo/s/sharding_feature_flags_gen.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -45,6 +48,64 @@ namespace mongo { namespace { +void translateToTimeseriesCollection(OperationContext* opCtx, + NamespaceString* nss, + CreateCollectionRequest* createCmdRequest) { + auto bucketsNs = nss->makeTimeseriesBucketsNamespace(); + auto bucketsColl = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs); + + // If the 'system.buckets' exists or 'timeseries' parameters are passed in, we know that + // we are trying shard a timeseries collection. + if (bucketsColl || createCmdRequest->getTimeseries()) { + uassert(5731502, + "Sharding a timeseries collection feature is not enabled", + feature_flags::gFeatureFlagShardedTimeSeries.isEnabled( + serverGlobalParams.featureCompatibility)); + + if (bucketsColl) { + uassert(6235600, + str::stream() << "the collection '" << bucketsNs + << "' does not have 'timeseries' options", + bucketsColl->getTimeseriesOptions()); + + if (createCmdRequest->getTimeseries()) { + uassert(6235601, + str::stream() + << "the 'timeseries' spec provided must match that of exists '" << nss + << "' collection", + timeseries::optionsAreEqual(*createCmdRequest->getTimeseries(), + *bucketsColl->getTimeseriesOptions())); + } else { + createCmdRequest->setTimeseries(bucketsColl->getTimeseriesOptions()); + } + } + + auto timeField = createCmdRequest->getTimeseries()->getTimeField(); + auto metaField = createCmdRequest->getTimeseries()->getMetaField(); + BSONObjIterator iter{*createCmdRequest->getShardKey()}; + while (auto elem = iter.next()) { + if (elem.fieldNameStringData() == timeField) { + uassert(6235602, + str::stream() << "the time field '" << timeField + << "' can be only at the end of the shard key pattern", + !iter.more()); + } else { + uassert(6235603, + str::stream() << "only the time field or meta field can be " + "part of shard key pattern", + metaField && + (elem.fieldNameStringData() == *metaField || + elem.fieldNameStringData().startsWith(*metaField + "."))); + } + } + *nss = bucketsNs; + createCmdRequest->setShardKey( + uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec( + *createCmdRequest->getTimeseries(), *createCmdRequest->getShardKey()))); + } +} + class ShardsvrCreateCollectionCommand final : public TypedCommand<ShardsvrCreateCollectionCommand> { public: using Request = ShardsvrCreateCollection; @@ -83,69 +144,20 @@ public: "Create Collection path has not been implemented", request().getShardKey()); - auto nss = ns(); - auto bucketsNs = nss.makeTimeseriesBucketsNamespace(); - auto bucketsColl = - CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs); - CreateCollectionRequest createCmdRequest = request().getCreateCollectionRequest(); - - // If the 'system.buckets' exists or 'timeseries' parameters are passed in, we know that - // we are trying shard a timeseries collection. - if (bucketsColl || createCmdRequest.getTimeseries()) { - uassert(5731502, - "Sharding a timeseries collection feature is not enabled", - feature_flags::gFeatureFlagShardedTimeSeries.isEnabled( - serverGlobalParams.featureCompatibility)); - - if (bucketsColl) { - uassert(6159000, - str::stream() << "the collection '" << bucketsNs - << "' does not have 'timeseries' options", - bucketsColl->getTimeseriesOptions()); - - if (createCmdRequest.getTimeseries()) { - uassert(5731500, - str::stream() - << "the 'timeseries' spec provided must match that of exists '" - << nss << "' collection", - timeseries::optionsAreEqual(*createCmdRequest.getTimeseries(), - *bucketsColl->getTimeseriesOptions())); - } else { - createCmdRequest.setTimeseries(bucketsColl->getTimeseriesOptions()); - } - } - - auto timeField = createCmdRequest.getTimeseries()->getTimeField(); - auto metaField = createCmdRequest.getTimeseries()->getMetaField(); - BSONObjIterator iter{*createCmdRequest.getShardKey()}; - while (auto elem = iter.next()) { - if (elem.fieldNameStringData() == timeField) { - uassert(5914000, - str::stream() - << "the time field '" << timeField - << "' can be only at the end of the shard key pattern", - !iter.more()); - } else { - uassert(5914001, - str::stream() << "only the time field or meta field can be " - "part of shard key pattern", - metaField && - (elem.fieldNameStringData() == *metaField || - elem.fieldNameStringData().startsWith(*metaField + "."))); - } + const auto createCollectionCoordinator = [&] { + auto nssToForward = ns(); + auto requestToForward = request().getCreateCollectionRequest(); + auto coordinatorType = DDLCoordinatorTypeEnum::kCreateCollection; + if (!feature_flags::gImplicitDDLTimeseriesNssTranslation.isEnabled( + serverGlobalParams.featureCompatibility)) { + translateToTimeseriesCollection(opCtx, &nssToForward, &requestToForward); + coordinatorType = DDLCoordinatorTypeEnum::kCreateCollectionPre61Compatible; } - nss = bucketsNs; - createCmdRequest.setShardKey( - uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec( - *createCmdRequest.getTimeseries(), *createCmdRequest.getShardKey()))); - } - const auto createCollectionCoordinator = [&] { auto coordinatorDoc = [&] { auto doc = CreateCollectionCoordinatorDocument(); - doc.setShardingDDLCoordinatorMetadata( - {{std::move(nss), DDLCoordinatorTypeEnum::kCreateCollection}}); - doc.setCreateCollectionRequest(std::move(createCmdRequest)); + doc.setShardingDDLCoordinatorMetadata({{nssToForward, coordinatorType}}); + doc.setCreateCollectionRequest(requestToForward); return doc.toBSON(); }(); |