diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2022-07-26 12:50:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-26 14:03:49 +0000 |
commit | ab2ced056ac09ea256cc4ff00fb2cbd0344f622a (patch) | |
tree | dd592c01c40df80c1d16d3bcb3ac5054b4132188 /src/mongo/db/s | |
parent | 12eb0fadbc8ad8490cd08fe04ac3728f1a076658 (diff) | |
download | mongo-ab2ced056ac09ea256cc4ff00fb2cbd0344f622a.tar.gz |
SERVER-62356 Serialize creation of sharded and unsharded Timeseries collections
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 258 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.h | 20 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator_params.cpp | 206 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator_params.h | 91 | ||||
-rw-r--r-- | src/mongo/db/s/recoverable_critical_section_service.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.h | 22 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_create_collection_command.cpp | 64 |
8 files changed, 455 insertions, 213 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index f7cbf589684..42e136a51fe 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -390,6 +390,7 @@ env.Library( 'config/set_user_write_block_mode_coordinator_document.idl', 'config/set_user_write_block_mode_coordinator.cpp', 'create_collection_coordinator_document.idl', + 'create_collection_coordinator_params.cpp', 'create_collection_coordinator.cpp', 'drop_collection_coordinator_document.idl', 'drop_collection_coordinator.cpp', diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index 216d88f5ddf..23be2f5d94f 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -210,55 +210,6 @@ int getNumShards(OperationContext* opCtx) { return shardRegistry->getNumShards(opCtx); } -std::pair<boost::optional<Collation>, BSONObj> getCollation( - OperationContext* opCtx, - const NamespaceString& nss, - const boost::optional<BSONObj>& collation) { - // Ensure the collation is valid. Currently we only allow the simple collation. - std::unique_ptr<CollatorInterface> requestedCollator = nullptr; - if (collation) { - requestedCollator = - uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(collation.value())); - uassert(ErrorCodes::BadValue, - str::stream() << "The collation for shardCollection must be {locale: 'simple'}, " - << "but found: " << collation.value(), - !requestedCollator); - } - - AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden); - - const auto actualCollator = [&]() -> const CollatorInterface* { - const auto& coll = autoColl.getCollection(); - if (coll) { - uassert( - ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped()); - return coll->getDefaultCollator(); - } - - return nullptr; - }(); - - if (!requestedCollator && !actualCollator) - return {boost::none, BSONObj()}; - - auto actualCollation = actualCollator->getSpec(); - auto actualCollatorBSON = actualCollation.toBSON(); - - if (!collation) { - auto actualCollatorFilter = - uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(actualCollatorBSON)); - uassert(ErrorCodes::BadValue, - str::stream() << "If no collation was specified, the collection collation must be " - "{locale: 'simple'}, " - << "but found: " << actualCollatorBSON, - !actualCollatorFilter); - } - - return {actualCollation, actualCollatorBSON}; -} - void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx, const UUID& uuid, const OperationSessionInfo& osi) { @@ -363,6 +314,12 @@ void CreateCollectionCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuild cmdInfoBuilder->appendElements(_request.toBSON()); } +const NamespaceString& CreateCollectionCoordinator::nss() const { + // Rely on the resolved request parameters to retrieve the nss to be targeted by the + // coordinator. + return _request.getNameSpaceToShard(); +} + void CreateCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { // If we have two shard collections on the same namespace, then the arguments must be the same. const auto otherDoc = CreateCollectionCoordinatorDocument::parse( @@ -380,7 +337,6 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( const CancellationToken& token) noexcept { return ExecutorFuture<void>(**executor) .then([this, anchor = shared_from_this()] { - _shardKeyPattern = ShardKeyPattern(*_request.getShardKey()); if (_doc.getPhase() < Phase::kCommit) { auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); @@ -409,42 +365,34 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( opCtx, getCurrentSession(), **executor); } + // Enter the critical sections before patching the user request to avoid data races + // with concurrenct creation of unsharded collections referencing the same + // namespace(s). + _acquireCriticalSections(opCtx); + + _request.resolveAgainstLocalCatalog(opCtx); + + _checkCollectionUUIDMismatch(opCtx); + // Log the start of the event only if we're not recovering. _logStartCreateCollection(opCtx); - // Quick check (without critical section) to see if another create collection - // already succeeded. + + // Check if the collection was already sharded by a past request if (auto createCollectionResponseOpt = sharding_ddl_util::checkIfCollectionAlreadySharded( opCtx, nss(), - _shardKeyPattern->getKeyPattern().toBSON(), - getCollation(opCtx, nss(), _request.getCollation()).second, - _request.getUnique().value_or(false))) { - _checkCollectionUUIDMismatch(opCtx); - - // The critical section can still be held here if the node committed the - // sharding of the collection but then it stepped down before it managed to - // delete the coordinator document - RecoverableCriticalSectionService::get(opCtx) - ->releaseRecoverableCriticalSection( - opCtx, - nss(), - _critSecReason, - ShardingCatalogClient::kMajorityWriteConcern); - + _request.getShardKeyPattern().getKeyPattern().toBSON(), + _request.getResolvedCollation(), + _doc.getUnique().value_or(false))) { + // A previous request already created and commited the collection but there was + // a stepdown after the commit. + _releaseCriticalSections(opCtx); _result = createCollectionResponseOpt; return; } - // Entering the critical section. From this point on, the writes are blocked. Before - // calling this method, we need the coordinator document to be persisted (and hence - // the kCheck state), otherwise nothing will release the critical section in the - // presence of a stepdown. - RecoverableCriticalSectionService::get(opCtx) - ->acquireRecoverableCriticalSectionBlockWrites( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); - if (!_firstExecution) { auto uuid = sharding_ddl_util::getCollectionUUID(opCtx, nss()); // If the collection can be found locally, then we clean up the config.chunks @@ -463,12 +411,11 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( } } - _checkCollectionUUIDMismatch(opCtx); _createPolicy(opCtx); _createCollectionAndIndexes(opCtx); audit::logShardCollection(opCtx->getClient(), - nss().ns(), + nss().toString(), *_request.getShardKey(), _request.getUnique().value_or(false)); @@ -478,12 +425,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // Block reads/writes from here on if we need to create the collection on other // shards, this way we prevent reads/writes that should be redirected to another // shard - RecoverableCriticalSectionService::get(opCtx) - ->promoteRecoverableCriticalSectionToBlockAlsoReads( - opCtx, - nss(), - _critSecReason, - ShardingCatalogClient::kMajorityWriteConcern); + _promoteCriticalSectionsToBlockReads(opCtx); + ; _updateSession(opCtx); _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession()); @@ -492,8 +435,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( } // End of the critical section, from now on, read and writes are permitted. - RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + _releaseCriticalSections(opCtx); // Slow path. Create chunks (which might incur in an index scan) and commit must be // done outside of the critical section to prevent writes from stalling in unsharded @@ -514,54 +456,43 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( !status.isA<ErrorCategory::ShutdownError>()) { LOGV2_ERROR(5458702, "Error running create collection", - "namespace"_attr = nss(), + "namespace"_attr = originalNss(), "error"_attr = redact(status)); auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); - RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + _releaseCriticalSections(opCtx); } return status; }); } void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) { - LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = nss()); - - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Namespace too long. Namespace: " << nss() - << " Max: " << NamespaceString::MaxNsShardedCollectionLen, - nss().size() <= NamespaceString::MaxNsShardedCollectionLen); + LOGV2_DEBUG( + 5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = originalNss()); - if (nss().db() == NamespaceString::kConfigDb) { + if (originalNss().db() == NamespaceString::kConfigDb) { // Only allowlisted collections in config may be sharded (unless we are in test mode) uassert(ErrorCodes::IllegalOperation, "only special collections in the config db may be sharded", - nss() == NamespaceString::kLogicalSessionsNamespace); + originalNss() == NamespaceString::kLogicalSessionsNamespace); } // Ensure that hashed and unique are not both set. uassert(ErrorCodes::InvalidOptions, "Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on " "the hashed field by declaring an additional (non-hashed) unique index on the field.", - !_shardKeyPattern->isHashedPattern() || !_request.getUnique().value_or(false)); - - // Ensure that a time-series collection cannot be sharded unless the feature flag is enabled. - if (nss().isTimeseriesBucketsCollection()) { - uassert(ErrorCodes::IllegalOperation, - str::stream() << "can't shard time-series collection " << nss(), - feature_flags::gFeatureFlagShardedTimeSeries.isEnabled( - serverGlobalParams.featureCompatibility) || - !timeseries::getTimeseriesOptions(opCtx, nss(), false)); - } + !ShardKeyPattern(*_request.getShardKey()).isHashedPattern() || + !_request.getUnique().value_or(false)); // Ensure the namespace is valid. uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", - !nss().isSystem() || nss() == NamespaceString::kLogicalSessionsNamespace || - nss().isTemporaryReshardingCollection() || nss().isTimeseriesBucketsCollection()); + !originalNss().isSystem() || + originalNss() == NamespaceString::kLogicalSessionsNamespace || + originalNss().isTemporaryReshardingCollection() || + originalNss().isTimeseriesBucketsCollection()); if (_request.getNumInitialChunks()) { // Ensure numInitialChunks is within valid bounds. @@ -581,14 +512,14 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx numChunks <= maxNumInitialChunksTotal); } - if (nss().db() == NamespaceString::kConfigDb) { + if (originalNss().db() == NamespaceString::kConfigDb) { auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto findReponse = uassertStatusOK( configShard->exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kMajorityReadConcern, - nss(), + originalNss(), BSONObj(), BSONObj(), 1)); @@ -607,42 +538,103 @@ void CreateCollectionCoordinator::_checkCollectionUUIDMismatch(OperationContext* checkCollectionUUIDMismatch(opCtx, nss(), coll.getCollection(), _request.getCollectionUUID()); } +void CreateCollectionCoordinator::_acquireCriticalSections(OperationContext* opCtx) const { + // TODO SERVER-68084 call RecoverableCriticalSectionService without the try/catch block + try { + RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( + opCtx, + originalNss(), + _critSecReason, + ShardingCatalogClient::kMajorityWriteConcern, + boost::none); + } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { + // If this collection already exists and it is a view we don't need the critical section + // because: + // 1. We will not shard the view namespace + // 2. This collection will remain a view since we are holding the DDL coll lock and thus + // the collection can't be dropped. + } + + // Preventively acquire the critical section protecting the buckets namespace that the creation + // of a timeseries collection would require. + const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace(); + RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( + opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); +} + +void CreateCollectionCoordinator::_promoteCriticalSectionsToBlockReads( + OperationContext* opCtx) const { + // TODO SERVER-68084 call RecoverableCriticalSectionService without the try/catch block + try { + RecoverableCriticalSectionService::get(opCtx) + ->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { + // ignore + } + + const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace(); + RecoverableCriticalSectionService::get(opCtx) + ->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); +} + +void CreateCollectionCoordinator::_releaseCriticalSections(OperationContext* opCtx) const { + // TODO SERVER-68084 call RecoverableCriticalSectionService without the try/catch block + try { + RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( + opCtx, originalNss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { + // ignore + } + + const auto bucketsNamespace = originalNss().makeTimeseriesBucketsNamespace(); + RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( + opCtx, bucketsNamespace, _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); +} + void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* opCtx) { LOGV2_DEBUG( 5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = nss()); + auto collationBSON = _request.getResolvedCollation(); boost::optional<Collation> collation; - std::tie(collation, _collationBSON) = getCollation(opCtx, nss(), _request.getCollation()); + if (!collationBSON.isEmpty()) { + collation.emplace( + Collation::parse(IDLParserErrorContext("CreateCollectionCoordinator"), collationBSON)); + } // We need to implicitly create a timeseries view and underlying bucket collection. - if (_collectionEmpty && _request.getTimeseries()) { + const auto& timeSeriesOptions = _request.getTimeseries(); + if (_collectionEmpty && timeSeriesOptions) { const auto viewName = nss().getTimeseriesViewNamespace(); - auto createCmd = makeCreateCommand(viewName, collation, _request.getTimeseries().get()); + auto createCmd = makeCreateCommand(viewName, collation, timeSeriesOptions.get()); BSONObj createRes; DBDirectClient localClient(opCtx); localClient.runCommand(nss().db().toString(), createCmd, createRes); auto createStatus = getStatusFromCommandResult(createRes); + // TODO this always supposed that the existing namespace is generated by a TS request! + // Should we verify that the options are compatible? if (!createStatus.isOK() && createStatus.code() == ErrorCodes::NamespaceExists) { - LOGV2_DEBUG(5909400, - 3, - "Timeseries namespace already exists", - "namespace"_attr = viewName.toString()); + LOGV2_WARNING(5909400, + "Timeseries namespace already exists", + "namespace"_attr = viewName.toString()); } else { uassertStatusOK(createStatus); } } - shardkeyutil::validateShardKeyIsNotEncrypted(opCtx, nss(), *_shardKeyPattern); + shardkeyutil::validateShardKeyIsNotEncrypted(opCtx, nss(), _request.getShardKeyPattern()); auto indexCreated = false; if (_request.getImplicitlyCreateIndex().value_or(true)) { indexCreated = shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible( opCtx, nss(), - *_shardKeyPattern, - _collationBSON, + _request.getShardKeyPattern(), + collationBSON, _request.getUnique().value_or(false), _request.getEnforceUniquenessCheck().value_or(true), shardkeyutil::ValidationBehaviorsShardCollection(opCtx)); @@ -651,8 +643,8 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* "Must have an index compatible with the proposed shard key", validShardKeyIndexExists(opCtx, nss(), - *_shardKeyPattern, - _collationBSON, + _request.getShardKeyPattern(), + collationBSON, _request.getUnique().value_or(false) && _request.getEnforceUniquenessCheck().value_or(true), shardkeyutil::ValidationBehaviorsShardCollection(opCtx))); @@ -681,11 +673,11 @@ void CreateCollectionCoordinator::_createPolicy(OperationContext* opCtx) { _splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy( opCtx, - *_shardKeyPattern, + _request.getShardKeyPattern(), _request.getNumInitialChunks() ? *_request.getNumInitialChunks() : 0, _request.getPresplitHashedZones() ? *_request.getPresplitHashedZones() : false, _request.getInitialSplitPoints(), - getTagsAndValidate(opCtx, nss(), _shardKeyPattern->toBSON()), + getTagsAndValidate(opCtx, nss(), _request.getShardKeyPattern().toBSON()), getNumShards(opCtx), *_collectionEmpty, !feature_flags::gNoMoreAutoSplitter.isEnabled(serverGlobalParams.featureCompatibility)); @@ -694,8 +686,10 @@ void CreateCollectionCoordinator::_createPolicy(OperationContext* opCtx) { void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) { LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = nss()); - _initialChunks = _splitPolicy->createFirstChunks( - opCtx, *_shardKeyPattern, {*_collectionUUID, ShardingState::get(opCtx)->shardId()}); + _initialChunks = + _splitPolicy->createFirstChunks(opCtx, + _request.getShardKeyPattern(), + {*_collectionUUID, ShardingState::get(opCtx)->shardId()}); // There must be at least one chunk. invariant(_initialChunks); @@ -776,7 +770,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { _initialChunks->collVersion().getTimestamp(), Date_t::now(), *_collectionUUID, - _shardKeyPattern->getKeyPattern()); + _request.getShardKeyPattern().getKeyPattern()); if (_request.getTimeseries()) { TypeCollectionTimeseriesFields timeseriesFields; @@ -784,8 +778,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { coll.setTimeseriesFields(std::move(timeseriesFields)); } - if (_collationBSON) { - coll.setDefaultCollation(_collationBSON.value()); + if (auto collationBSON = _request.getResolvedCollation(); !collationBSON.isEmpty()) { + coll.setDefaultCollation(collationBSON); } if (_request.getUnique()) { @@ -859,10 +853,10 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { void CreateCollectionCoordinator::_logStartCreateCollection(OperationContext* opCtx) { BSONObjBuilder collectionDetail; collectionDetail.append("shardKey", *_request.getShardKey()); - collectionDetail.append("collection", nss().ns()); + collectionDetail.append("collection", originalNss().ns()); collectionDetail.append("primary", ShardingState::get(opCtx)->shardId().toString()); ShardingLogging::get(opCtx)->logChange( - opCtx, "shardCollection.start", nss().ns(), collectionDetail.obj()); + opCtx, "shardCollection.start", originalNss().ns(), collectionDetail.obj()); } void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCtx) { @@ -875,7 +869,7 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt collectionDetail.appendNumber("numChunks", static_cast<long long>(_initialChunks->chunks.size())); ShardingLogging::get(opCtx)->logChange( - opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj()); + opCtx, "shardCollection.end", originalNss().ns(), collectionDetail.obj()); } } // namespace mongo diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h index a1f8bbea4e8..3fa06c83e51 100644 --- a/src/mongo/db/s/create_collection_coordinator.h +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -32,6 +32,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/s/config/initial_split_policy.h" #include "mongo/db/s/create_collection_coordinator_document_gen.h" +#include "mongo/db/s/create_collection_coordinator_params.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_ddl_coordinator.h" #include "mongo/s/request_types/sharded_ddl_commands_gen.h" @@ -48,10 +49,10 @@ public: CreateCollectionCoordinator(ShardingDDLCoordinatorService* service, const BSONObj& initialState) : RecoverableShardingDDLCoordinator(service, "CreateCollectionCoordinator", initialState), - _request(_doc.getCreateCollectionRequest()), + _request(_doc.getCreateCollectionRequest(), originalNss()), _critSecReason(BSON("command" << "createCollection" - << "ns" << nss().toString())) {} + << "ns" << originalNss().toString())) {} ~CreateCollectionCoordinator() = default; @@ -71,7 +72,7 @@ public: } protected: - const mongo::CreateCollectionRequest _request; + const NamespaceString& nss() const override; private: StringData serializePhase(const Phase& phase) const override { @@ -91,6 +92,12 @@ private: */ void _checkCollectionUUIDMismatch(OperationContext* opCtx) const; + void _acquireCriticalSections(OperationContext* opCtx) const; + + void _promoteCriticalSectionsToBlockReads(OperationContext* opCtx) const; + + void _releaseCriticalSections(OperationContext* opCtx) const; + /** * Ensures the collection is created locally and has the appropiate shard index. */ @@ -130,11 +137,9 @@ private: */ void _logEndCreateCollection(OperationContext* opCtx); - const BSONObj _critSecReason; + CreateCollectionCoordinatorParams _request; - // The shard key of the collection, static for the duration of the coordinator and reflects the - // original command - boost::optional<ShardKeyPattern> _shardKeyPattern; + const BSONObj _critSecReason; // Set on successful completion of the coordinator boost::optional<CreateCollectionResponse> _result; @@ -142,7 +147,6 @@ private: // The fields below are only populated if the coordinator enters in the branch where the // collection is not already sharded (i.e., they will not be present on early return) - boost::optional<BSONObj> _collationBSON; boost::optional<UUID> _collectionUUID; std::unique_ptr<InitialSplitPolicy> _splitPolicy; diff --git a/src/mongo/db/s/create_collection_coordinator_params.cpp b/src/mongo/db/s/create_collection_coordinator_params.cpp new file mode 100644 index 00000000000..fe07526175c --- /dev/null +++ b/src/mongo/db/s/create_collection_coordinator_params.cpp @@ -0,0 +1,206 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/create_collection_coordinator_params.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" +namespace mongo { + +namespace { +BSONObj resolveCollationForUserQueries(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<BSONObj>& collationInRequest) { + // Ensure the collation is valid. Currently we only allow the simple collation. + std::unique_ptr<CollatorInterface> requestedCollator = nullptr; + if (collationInRequest) { + requestedCollator = + uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(collationInRequest.value())); + uassert(ErrorCodes::BadValue, + str::stream() << "The collation for shardCollection must be {locale: 'simple'}, " + << "but found: " << collationInRequest.value(), + !requestedCollator); + } + + AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden); + + const auto actualCollator = [&]() -> const CollatorInterface* { + const auto& coll = autoColl.getCollection(); + if (coll) { + uassert( + ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped()); + return coll->getDefaultCollator(); + } + + return nullptr; + }(); + + if (!requestedCollator && !actualCollator) + return BSONObj(); + + auto actualCollation = actualCollator->getSpec(); + auto actualCollatorBSON = actualCollation.toBSON(); + + if (!collationInRequest) { + auto actualCollatorFilter = + uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(actualCollatorBSON)); + uassert(ErrorCodes::BadValue, + str::stream() << "If no collation was specified, the collection collation must be " + "{locale: 'simple'}, " + << "but found: " << actualCollatorBSON, + !actualCollatorFilter); + } + + return actualCollatorBSON; +} +} // namespace + +CreateCollectionCoordinatorParams::CreateCollectionCoordinatorParams( + const CreateCollectionRequest& request, const NamespaceString& targetedNamespace) + : CreateCollectionRequest(request), + _resolutionPerformed(false), + _originalNamespace(targetedNamespace), + _resolvedNamespace() {} + +void CreateCollectionCoordinatorParams::resolveAgainstLocalCatalog(OperationContext* opCtx) { + invariant(!_resolutionPerformed, "The resolution should only be performed once"); + auto bucketsNs = _originalNamespace.makeTimeseriesBucketsNamespace(); + auto existingBucketsColl = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs); + + auto& timeseriesOptionsInRequest = CreateCollectionRequest::getTimeseries(); + + if (!timeseriesOptionsInRequest && !existingBucketsColl) { + // The request is targeting a new or existing standard collection. + _resolvedNamespace = _originalNamespace; + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Namespace too long. Namespace: " << _resolvedNamespace + << " Max: " << NamespaceString::MaxNsShardedCollectionLen, + _resolvedNamespace.size() <= NamespaceString::MaxNsShardedCollectionLen); + + auto targetCollection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead( + opCtx, _resolvedNamespace); + _resolvedCollation = resolveCollationForUserQueries( + opCtx, _resolvedNamespace, CreateCollectionRequest::getCollation()); + _shardKeyPattern = ShardKeyPattern(*getShardKey()); + _resolutionPerformed = true; + return; + } + + // The request is targeting a new or existing Timeseries collection. + _resolvedNamespace = bucketsNs; + uassert(ErrorCodes::IllegalOperation, + "Sharding a timeseries collection feature is not enabled", + feature_flags::gFeatureFlagShardedTimeSeries.isEnabled( + serverGlobalParams.featureCompatibility)); + + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Namespace too long. Namespace: " << _resolvedNamespace + << " Max: " << NamespaceString::MaxNsShardedCollectionLen, + _resolvedNamespace.size() <= NamespaceString::MaxNsShardedCollectionLen); + + // Consolidate the related request parameters... + auto existingTimeseriesOptions = [&bucketsNs, &existingBucketsColl] { + if (!existingBucketsColl) { + return boost::optional<TimeseriesOptions>(); + } + + uassert(6159000, + str::stream() << "the collection '" << bucketsNs + << "' does not have 'timeseries' options", + existingBucketsColl->getTimeseriesOptions()); + return existingBucketsColl->getTimeseriesOptions(); + }(); + + if (timeseriesOptionsInRequest && existingTimeseriesOptions) { + uassert( + 5731500, + str::stream() << "the 'timeseries' spec provided must match that of exists '" + << _originalNamespace << "' collection", + timeseries::optionsAreEqual(*timeseriesOptionsInRequest, *existingTimeseriesOptions)); + } else if (!timeseriesOptionsInRequest) { + setTimeseries(existingTimeseriesOptions); + } + + // check that they are consistent with the requested shard key... + auto timeFieldName = timeseriesOptionsInRequest->getTimeField(); + auto metaFieldName = timeseriesOptionsInRequest->getMetaField(); + BSONObjIterator shardKeyElems{*getShardKey()}; + while (auto elem = shardKeyElems.next()) { + if (elem.fieldNameStringData() == timeFieldName) { + uassert(5914000, + str::stream() << "the time field '" << timeFieldName + << "' can be only at the end of the shard key pattern", + !shardKeyElems.more()); + } else { + uassert(5914001, + str::stream() << "only the time field or meta field can be " + "part of shard key pattern", + metaFieldName && + (elem.fieldNameStringData() == *metaFieldName || + elem.fieldNameStringData().startsWith(*metaFieldName + "."))); + } + } + // ...and create the shard key pattern object. + _shardKeyPattern.emplace( + uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec( + *timeseriesOptionsInRequest, *getShardKey()))); + + _resolvedCollation = resolveCollationForUserQueries( + opCtx, _resolvedNamespace, CreateCollectionRequest::getCollation()); + _resolutionPerformed = true; +} + +BSONObj CreateCollectionCoordinatorParams::getResolvedCollation() const { + invariant(_resolutionPerformed); + return _resolvedCollation; +} + +const NamespaceString& CreateCollectionCoordinatorParams::getNameSpaceToShard() const { + invariant(_resolutionPerformed); + return _resolvedNamespace; +} + +const ShardKeyPattern& CreateCollectionCoordinatorParams::getShardKeyPattern() const { + invariant(_resolutionPerformed); + return *_shardKeyPattern; +} + +const boost::optional<TimeseriesOptions>& CreateCollectionCoordinatorParams::getTimeseries() const { + invariant(_resolutionPerformed); + return CreateCollectionRequest::getTimeseries(); +} + +boost::optional<TimeseriesOptions>& CreateCollectionCoordinatorParams::getTimeseries() { + invariant(_resolutionPerformed); + return CreateCollectionRequest::getTimeseries(); +} +} // namespace mongo diff --git a/src/mongo/db/s/create_collection_coordinator_params.h b/src/mongo/db/s/create_collection_coordinator_params.h new file mode 100644 index 00000000000..8f94043584f --- /dev/null +++ b/src/mongo/db/s/create_collection_coordinator_params.h @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" +#include "mongo/s/shard_key_pattern.h" +#include "mongo/util/assert_util.h" + + +namespace mongo { + +/** + * An extension of the CreateCollectionRequest parameters received by the Coordinator including + * methods to resolve the user request against the current state of the DB catalog and safely access + * the outcome. + */ +class CreateCollectionCoordinatorParams : public CreateCollectionRequest { +public: + CreateCollectionCoordinatorParams(const CreateCollectionRequest& request, + const NamespaceString& targetedNamespace); + + /* + * Resolution method to be invoked before accessing any of the request fields. It assumes that + * the caller has already acquired the needed resources to ensure that the catalog can be + * safely accessed. + */ + void resolveAgainstLocalCatalog(OperationContext* opCtx); + + const NamespaceString& getNameSpaceToShard() const; + + const ShardKeyPattern& getShardKeyPattern() const; + + const boost::optional<TimeseriesOptions>& getTimeseries() const; + + BSONObj getResolvedCollation() const; + + boost::optional<TimeseriesOptions>& getTimeseries(); + +private: + bool _resolutionPerformed; + NamespaceString _originalNamespace; + NamespaceString _resolvedNamespace; + boost::optional<ShardKeyPattern> _shardKeyPattern; + BSONObj _resolvedCollation; + + // Hide harmful non-virtual methods defined by the parent class + + void setShardKey(boost::optional<BSONObj> value) { + MONGO_UNREACHABLE; + } + + const boost::optional<BSONObj>& getCollation() const { + MONGO_UNREACHABLE; + } + + void setCollation(boost::optional<BSONObj> value) { + MONGO_UNREACHABLE; + } +}; + +} // namespace mongo diff --git a/src/mongo/db/s/recoverable_critical_section_service.cpp b/src/mongo/db/s/recoverable_critical_section_service.cpp index 0ea15b6e594..dbf5b492f71 100644 --- a/src/mongo/db/s/recoverable_critical_section_service.cpp +++ b/src/mongo/db/s/recoverable_critical_section_service.cpp @@ -98,6 +98,8 @@ void RecoverableCriticalSectionService::acquireRecoverableCriticalSectionBlockWr { Lock::GlobalLock lk(opCtx, MODE_IX); + // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to + // construct cCollLock. AutoGetCollection cCollLock(opCtx, nss, MODE_S); DBDirectClient dbClient(opCtx); @@ -184,6 +186,8 @@ void RecoverableCriticalSectionService::promoteRecoverableCriticalSectionToBlock invariant(!opCtx->lockState()->isLocked()); { + // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to + // construct cCollLock. AutoGetCollection cCollLock(opCtx, nss, MODE_X); DBDirectClient dbClient(opCtx); @@ -284,6 +288,8 @@ void RecoverableCriticalSectionService::releaseRecoverableCriticalSection( invariant(!opCtx->lockState()->isLocked()); { + // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to + // construct cCollLock. AutoGetCollection collLock(opCtx, nss, MODE_X); DBDirectClient dbClient(opCtx); diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index f89861078c8..9119d44583d 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -87,17 +87,6 @@ public: return _completionPromise.getFuture(); } - const NamespaceString& originalNss() const { - return _coordId.getNss(); - } - - const NamespaceString& nss() const { - if (const auto& bucketNss = metadata().getBucketNss()) { - return bucketNss.get(); - } - return originalNss(); - } - DDLCoordinatorTypeEnum operationType() const { return _coordId.getOperationType(); } @@ -112,6 +101,17 @@ public: } protected: + const NamespaceString& originalNss() const { + return _coordId.getNss(); + } + + virtual const NamespaceString& nss() const { + if (const auto& bucketNss = metadata().getBucketNss()) { + return bucketNss.get(); + } + return originalNss(); + } + virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) { return {}; }; diff --git a/src/mongo/db/s/shardsvr_create_collection_command.cpp b/src/mongo/db/s/shardsvr_create_collection_command.cpp index 3769e253b7b..f0405eb8636 100644 --- a/src/mongo/db/s/shardsvr_create_collection_command.cpp +++ b/src/mongo/db/s/shardsvr_create_collection_command.cpp @@ -29,14 +29,11 @@ #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/commands.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/s/create_collection_coordinator.h" #include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" -#include "mongo/db/timeseries/timeseries_options.h" #include "mongo/logv2/log.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -83,71 +80,14 @@ public: "Create Collection path has not been implemented", request().getShardKey()); - auto nss = ns(); - auto bucketsNs = nss.makeTimeseriesBucketsNamespace(); - auto bucketsColl = - CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs); - CreateCollectionRequest createCmdRequest = request().getCreateCollectionRequest(); - - // If the 'system.buckets' exists or 'timeseries' parameters are passed in, we know that - // we are trying shard a timeseries collection. - if (bucketsColl || createCmdRequest.getTimeseries()) { - uassert(5731502, - "Sharding a timeseries collection feature is not enabled", - feature_flags::gFeatureFlagShardedTimeSeries.isEnabled( - serverGlobalParams.featureCompatibility)); - - if (bucketsColl) { - uassert(6159000, - str::stream() << "the collection '" << bucketsNs - << "' does not have 'timeseries' options", - bucketsColl->getTimeseriesOptions()); - - if (createCmdRequest.getTimeseries()) { - uassert(5731500, - str::stream() - << "the 'timeseries' spec provided must match that of exists '" - << nss << "' collection", - timeseries::optionsAreEqual(*createCmdRequest.getTimeseries(), - *bucketsColl->getTimeseriesOptions())); - } else { - createCmdRequest.setTimeseries(bucketsColl->getTimeseriesOptions()); - } - } - - auto timeField = createCmdRequest.getTimeseries()->getTimeField(); - auto metaField = createCmdRequest.getTimeseries()->getMetaField(); - BSONObjIterator iter{*createCmdRequest.getShardKey()}; - while (auto elem = iter.next()) { - if (elem.fieldNameStringData() == timeField) { - uassert(5914000, - str::stream() - << "the time field '" << timeField - << "' can be only at the end of the shard key pattern", - !iter.more()); - } else { - uassert(5914001, - str::stream() << "only the time field or meta field can be " - "part of shard key pattern", - metaField && - (elem.fieldNameStringData() == *metaField || - elem.fieldNameStringData().startsWith(*metaField + "."))); - } - } - nss = bucketsNs; - createCmdRequest.setShardKey( - uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec( - *createCmdRequest.getTimeseries(), *createCmdRequest.getShardKey()))); - } - const auto createCollectionCoordinator = [&] { FixedFCVRegion fixedFcvRegion(opCtx); auto coordinatorDoc = [&] { auto doc = CreateCollectionCoordinatorDocument(); doc.setShardingDDLCoordinatorMetadata( - {{std::move(nss), DDLCoordinatorTypeEnum::kCreateCollection}}); - doc.setCreateCollectionRequest(std::move(createCmdRequest)); + {{ns(), DDLCoordinatorTypeEnum::kCreateCollection}}); + doc.setCreateCollectionRequest(request().getCreateCollectionRequest()); return doc.toBSON(); }(); |