diff options
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 449 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.h | 60 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator_document.idl | 97 | ||||
-rw-r--r-- | src/mongo/db/s/shard_key_util.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/s/shard_key_util.h | 15 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.h | 1 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.idl | 1 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator_service.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_create_collection_command.cpp | 27 |
11 files changed, 542 insertions, 141 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 1595ea721e2..6667851b1d0 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -334,6 +334,7 @@ env.Library( 'config/configsvr_split_chunk_command.cpp', 'config/configsvr_update_zone_key_range_command.cpp', 'create_collection_coordinator.cpp', + 'create_collection_coordinator_document.idl', 'drop_collection_coordinator.cpp', 'drop_collection_coordinator_document.idl', 'drop_database_coordinator.cpp', diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index a98d17eca81..16629312351 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -34,6 +34,7 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/persistent_task_store.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/create_collection_coordinator.h" @@ -245,6 +246,25 @@ BSONObj getCollation(OperationContext* opCtx, return actualCollatorBSON; } +void removeChunks(OperationContext* opCtx, const UUID& uuid) { + BatchWriteExecStats stats; + BatchedCommandResponse response; + BatchedCommandRequest deleteRequest([&]() { + write_ops::Delete deleteOp(ChunkType::ConfigNS); + deleteOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + deleteOp.setDeletes( + std::vector{write_ops::DeleteOpEntry(BSON(ChunkType::collectionUUID << uuid), false)}); + return deleteOp; + }()); + deleteRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + cluster::write(opCtx, deleteRequest, &stats, &response); + uassertStatusOK(response.toStatus()); +} + void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) { BatchWriteExecStats stats; BatchedCommandResponse response; @@ -303,103 +323,264 @@ void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, Col } } +void removeShardIndex(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& keyPattern) { + DBDirectClient localClient(opCtx); + + localClient.dropIndex(nss.ns(), keyPattern, WriteConcernOptions::Majority); +} + +void broadcastDropCollection(OperationContext* opCtx, + const NamespaceString& nss, + const std::shared_ptr<executor::TaskExecutor>& executor) { + const auto primaryShardId = ShardingState::get(opCtx)->shardId(); + const ShardsvrDropCollectionParticipant dropCollectionParticipant(nss); + + auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); + // Remove prymary shard from participants + participants.erase(std::remove(participants.begin(), participants.end(), primaryShardId), + participants.end()); + + sharding_ddl_util::sendAuthenticatedCommandToShards( + opCtx, + nss.db(), + CommandHelpers::appendMajorityWriteConcern(dropCollectionParticipant.toBSON({})), + participants, + executor); +} + } // namespace -CreateCollectionCoordinator::CreateCollectionCoordinator( - OperationContext* opCtx, const ShardsvrCreateCollection& createCollParams) - : ShardingDDLCoordinator_NORESILIENT(opCtx, createCollParams.getNamespace()), - _serviceContext(opCtx->getServiceContext()), - _request(createCollParams), - _nss(_request.getNamespace()) { - invariant(createCollParams.getShardKey()); - _shardKeyPattern = ShardKeyPattern(createCollParams.getShardKey()->getOwned()); +CreateCollectionCoordinator::CreateCollectionCoordinator(const BSONObj& initialState) + : ShardingDDLCoordinator(initialState), + _doc(CreateCollectionCoordinatorDocument::parse( + IDLParserErrorContext("CreateCollectionCoordinatorDocument"), initialState)) {} + +boost::optional<BSONObj> CreateCollectionCoordinator::reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { + // TODO (SERVER-55485): Add request parameters. + BSONObjBuilder cmdBob; + if (const auto& optComment = getForwardableOpMetadata().getComment()) { + cmdBob.append(optComment.get().firstElement()); + } + BSONObjBuilder bob; + bob.append("type", "op"); + bob.append("desc", "CreateCollectionCoordinator"); + bob.append("op", "command"); + bob.append("ns", nss().toString()); + bob.append("command", cmdBob.obj()); + bob.append("currentPhase", _doc.getPhase()); + bob.append("active", true); + return bob.obj(); } -SemiFuture<void> CreateCollectionCoordinator::runImpl( - std::shared_ptr<executor::TaskExecutor> executor) { - return ExecutorFuture<void>(executor, Status::OK()) - .then([this, anchor = shared_from_this()]() { - ThreadClient tc("CreateCollectionCoordinator", _serviceContext); - auto opCtxHolder = tc->makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - _forwardableOpMetadata.setOn(opCtx); - - _checkCommandArguments(opCtx); - if (_result) { - // Early return before holding the critical section, the collection was already - // created. - return; - } - { - // From this point on all writes are blocked on the collection. - ScopedShardVersionCriticalSection critSec(opCtx, _nss); - _createCollectionAndIndexes(opCtx); - if (_result) { - // Early return, the collection was already created. +void CreateCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { + const auto errorMessage = "Another operation with different arguments is already running"; + auto opUnique = _doc.getUnique().is_initialized(); + auto opCollation = _doc.getCollation().is_initialized(); + auto opNumInitialChunks = _doc.getNumInitialChunks().is_initialized(); + auto opPresplit = _doc.getPresplitHashedZones().is_initialized(); + auto opSplitPoints = _doc.getInitialSplitPoints().is_initialized(); + + uassert(ErrorCodes::ConflictingOperationInProgress, + errorMessage, + opUnique == doc.hasField(CoordDoc::kUniqueFieldName) && + opCollation == doc.hasField(CoordDoc::kCollationFieldName) && + opNumInitialChunks == doc.hasField(CoordDoc::kNumInitialChunksFieldName) && + opPresplit == doc.hasField(CoordDoc::kPresplitHashedZonesFieldName) && + opSplitPoints == doc.hasField(CoordDoc::kInitialSplitPointsFieldName)); + + bool sameArguments; + const auto& shardKey = doc.getObjectField(CoordDoc::kShardKeyFieldName); + sameArguments = SimpleBSONObjComparator::kInstance.evaluate(shardKey == *_doc.getShardKey()); + + if (sameArguments && opUnique) { + sameArguments = doc.getBoolField(CoordDoc::kUniqueFieldName) == *_doc.getUnique(); + } + + if (sameArguments && opPresplit) { + sameArguments = doc.getBoolField(CoordDoc::kPresplitHashedZonesFieldName) == + *_doc.getPresplitHashedZones(); + } + + if (sameArguments && opNumInitialChunks) { + sameArguments = + doc.getIntField(CoordDoc::kNumInitialChunksFieldName) == *_doc.getNumInitialChunks(); + } + + if (sameArguments && opCollation) { + const auto& collation = doc.getObjectField(CoordDoc::kCollationFieldName); + sameArguments = + SimpleBSONObjComparator::kInstance.evaluate(collation == *_doc.getCollation()); + } + + if (sameArguments && opSplitPoints) { + const auto& initialSplitPoints = doc.getObjectField(CoordDoc::kInitialSplitPointsFieldName); + BSONArrayBuilder builder; + builder.append(*_doc.getInitialSplitPoints()); + const auto& currentSplitPoints = builder.obj(); + sameArguments = + SimpleBSONObjComparator::kInstance.evaluate(initialSplitPoints == currentSplitPoints); + } + + // If we have two shard collections on the same namespace, then the arguments must be the same. + uassert(ErrorCodes::ConflictingOperationInProgress, errorMessage, sameArguments); +} + +ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept { + return ExecutorFuture<void>(**executor) + .then(_executePhase(Phase::kCheck, + [this, anchor = shared_from_this()] { + _shardKeyPattern = ShardKeyPattern(*_doc.getShardKey()); + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + _checkCommandArguments(opCtx); + + // Checks if the shard key index is already created and can be used + // to shard the collection, so if there is a rollback needed, the + // shard key should not be dropped because it was part of the + // collection originally before sharding the collection. + _doc.setShardKeyAlreadyCreated( + shardkeyutil::validShardKeyIndexExists( + opCtx, + nss(), + *_shardKeyPattern, + _collation, + _doc.getUnique().get_value_or(false), + shardkeyutil::ValidationBehaviorsShardCollection(opCtx))); + })) + .then(_executePhase( + Phase::kCommit, + [this, executor = executor, anchor = shared_from_this()] { + if (!_shardKeyPattern) { + _shardKeyPattern = ShardKeyPattern(*_doc.getShardKey()); + } + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + if (auto createCollectionResponseOpt = + sharding_ddl_util::checkIfCollectionAlreadySharded( + opCtx, + nss(), + _shardKeyPattern->getKeyPattern().toBSON(), + getCollation(opCtx, nss(), _doc.getCollation()), + _doc.getUnique().value_or(false))) { + _result = createCollectionResponseOpt; + // Early return before holding the critical section, the + // collection was already created and commited but there was a + // stepdown before removing the coordinator document. return; } - _createChunks(opCtx); - if (_splitPolicy->isOptimized()) { - // Block reads/writes from here on if we need to create the collection on other - // shards, this way we prevent reads/writes that should be redirected to another - // shard. - critSec.enterCommitPhase(); - _createCollectionOnNonPrimaryShards(opCtx); + { + // Entering the critical section. From this point on, the writes + // are blocked. + ScopedShardVersionCriticalSection critSec(opCtx, nss()); + + if (_recoveredFromDisk) { + LOGV2_DEBUG(5458704, + 1, + "Removing partial changes from previous run", + "namespace"_attr = nss(), + "dropShardKey"_attr = _doc.getShardKeyAlreadyCreated()); + if (!_doc.getShardKeyAlreadyCreated()) { + // TODO SERVER-55551: remove this to prevent continous failover on huge + // collections. + removeShardIndex(opCtx, nss(), _shardKeyPattern->toBSON()); + } + removeChunks(opCtx, *getUUIDFromPrimaryShard(opCtx, nss())); + broadcastDropCollection(opCtx, nss(), **executor); + } + + _createCollectionAndIndexes(opCtx); + + _createPolicyAndChunks(opCtx); + + if (_splitPolicy->isOptimized()) { + // Block reads/writes from here on if we need to create + // the collection on other shards, this way we prevent + // reads/writes that should be redirected to another + // shard. + critSec.enterCommitPhase(); + _createCollectionOnNonPrimaryShards(opCtx); + + _commit(opCtx); + } + } + if (!_splitPolicy->isOptimized()) { _commit(opCtx); } - } - if (!_splitPolicy->isOptimized()) { - _commit(opCtx); + _finalize(opCtx); + })) + .onCompletion([this, anchor = shared_from_this()](const Status& status) { + try { + _removeCoordinatorDocument(); + uassertStatusOK(status); + LOGV2(5458702, "Collection created", "namespace"_attr = nss()); + } catch (const ExceptionForCat<ErrorCategory::NotPrimaryError>& ex) { + LOGV2_ERROR(5458701, + "Create collection interrupted with a NotPrimaryError exception " + "category, it will continue on the next primary", + "namespace"_attr = nss(), + "error"_attr = redact(ex)); + } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& ex) { + LOGV2_ERROR(5458707, + "Create collection interrupted with a ShutdownError exception " + "category, it will continue on the next primary", + "namespace"_attr = nss(), + "error"_attr = redact(ex)); + } catch (const DBException& ex) { + LOGV2_ERROR(5458703, + "Error running create collection", + "namespace"_attr = nss(), + "error"_attr = redact(ex)); + // TODO SERVER-55396: retry operation until it succeeds. + throw; } - - _cleanup(opCtx); - }) - .onError([this, anchor = shared_from_this()](const Status& status) { - LOGV2_ERROR(5277908, - "Error running create collection", - "namespace"_attr = _nss, - "error"_attr = redact(status)); - return status; - }) - .semi(); + }); } void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) { - LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = _nss); + LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = nss()); - const auto dbInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabaseWithRefresh(opCtx, _nss.db())); + const auto dbInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getDatabaseWithRefresh(opCtx, nss().db())); uassert(ErrorCodes::IllegalOperation, - str::stream() << "sharding not enabled for db " << _nss.db(), + str::stream() << "sharding not enabled for db " << nss().db(), dbInfo.shardingEnabled()); - if (_nss.db() == NamespaceString::kConfigDb) { + if (nss().db() == NamespaceString::kConfigDb) { // Only whitelisted collections in config may be sharded (unless we are in test mode) uassert(ErrorCodes::IllegalOperation, "only special collections in the config db may be sharded", - _nss == NamespaceString::kLogicalSessionsNamespace); + nss() == NamespaceString::kLogicalSessionsNamespace); } // Ensure that hashed and unique are not both set. uassert(ErrorCodes::InvalidOptions, "Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on " "the hashed field by declaring an additional (non-hashed) unique index on the field.", - !_shardKeyPattern.value().isHashedPattern() || - !(_request.getUnique() && _request.getUnique().value())); + !_shardKeyPattern->isHashedPattern() || !_doc.getUnique().value_or(false)); // Ensure the namespace is valid. uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", - !_nss.isSystem() || _nss == NamespaceString::kLogicalSessionsNamespace || - _nss.isTemporaryReshardingCollection() || _nss.isTimeseriesBucketsCollection()); + !nss().isSystem() || nss() == NamespaceString::kLogicalSessionsNamespace || + nss().isTemporaryReshardingCollection() || nss().isTimeseriesBucketsCollection()); - if (_request.getNumInitialChunks()) { + if (_doc.getNumInitialChunks()) { // Ensure numInitialChunks is within valid bounds. // Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000 // chunks in total to limit the amount of memory this command consumes so there is less @@ -408,7 +589,7 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx const int maxNumInitialChunksForShards = Grid::get(opCtx)->shardRegistry()->getNumShardsNoReload() * 8192; const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption - int numChunks = _request.getNumInitialChunks().value(); + int numChunks = _doc.getNumInitialChunks().value(); uassert(ErrorCodes::InvalidOptions, str::stream() << "numInitialChunks cannot be more than either: " << maxNumInitialChunksForShards << ", 8192 * number of shards; or " @@ -417,14 +598,14 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx numChunks <= maxNumInitialChunksTotal); } - if (_nss.db() == NamespaceString::kConfigDb) { + if (nss().db() == NamespaceString::kConfigDb) { auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto findReponse = uassertStatusOK( configShard->exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kMajorityReadConcern, - _nss, + nss(), BSONObj(), BSONObj(), 1)); @@ -436,60 +617,55 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx "collections in the config db must be empty to be sharded", numDocs == 0); } - - auto unique = _request.getUnique() ? *_request.getUnique() : false; - if (auto createCollectionResponseOpt = - mongo::sharding_ddl_util::checkIfCollectionAlreadySharded( - opCtx, - _nss, - _shardKeyPattern->getKeyPattern().toBSON(), - getCollation(opCtx, _nss, _request.getCollation()), - unique)) { - _result = createCollectionResponseOpt; - } } void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* opCtx) { LOGV2_DEBUG( - 5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = _nss); + 5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = nss()); - auto unique = _request.getUnique() ? *_request.getUnique() : false; - _collation = getCollation(opCtx, _nss, _request.getCollation()); + auto unique = _doc.getUnique().value_or(false); + _collation = getCollation(opCtx, nss(), _doc.getCollation()); if (auto createCollectionResponseOpt = mongo::sharding_ddl_util::checkIfCollectionAlreadySharded( - opCtx, _nss, _shardKeyPattern->getKeyPattern().toBSON(), *_collation, unique)) { + opCtx, nss(), _shardKeyPattern->getKeyPattern().toBSON(), *_collation, unique)) { _result = createCollectionResponseOpt; return; } - // Internally creates the collection if it doesn't exist. shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible( opCtx, - _nss, + nss(), *_shardKeyPattern, _collation, - _request.getUnique() ? *_request.getUnique() : false, + _doc.getUnique().value_or(false), shardkeyutil::ValidationBehaviorsShardCollection(opCtx)); - _collectionUUID = *getUUIDFromPrimaryShard(opCtx, _nss); + // Wait until the index is majority written, to prevent having the collection commited to the + // config server, but the index creation rolled backed on stepdowns. + WriteConcernResult ignoreResult; + auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + uassertStatusOK(waitForWriteConcern( + opCtx, latestOpTime, ShardingCatalogClient::kMajorityWriteConcern, &ignoreResult)); + + _collectionUUID = *getUUIDFromPrimaryShard(opCtx, nss()); } -void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) { - LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = _nss); +void CreateCollectionCoordinator::_createPolicyAndChunks(OperationContext* opCtx) { + LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = nss()); _splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy( opCtx, *_shardKeyPattern, - _request.getNumInitialChunks() ? *_request.getNumInitialChunks() : 0, - _request.getPresplitHashedZones() ? *_request.getPresplitHashedZones() : false, - _request.getInitialSplitPoints(), - getTagsAndValidate(opCtx, _nss, _shardKeyPattern->toBSON(), *_shardKeyPattern), + _doc.getNumInitialChunks() ? *_doc.getNumInitialChunks() : 0, + _doc.getPresplitHashedZones() ? *_doc.getPresplitHashedZones() : false, + _doc.getInitialSplitPoints(), + getTagsAndValidate(opCtx, nss(), _shardKeyPattern->toBSON(), *_shardKeyPattern), getNumShards(opCtx), - checkIfCollectionIsEmpty(opCtx, _nss)); + checkIfCollectionIsEmpty(opCtx, nss())); _initialChunks = _splitPolicy->createFirstChunks( - opCtx, *_shardKeyPattern, {_nss, *_collectionUUID, ShardingState::get(opCtx)->shardId()}); + opCtx, *_shardKeyPattern, {nss(), *_collectionUUID, ShardingState::get(opCtx)->shardId()}); // There must be at least one chunk. invariant(!_initialChunks.chunks.empty()); @@ -499,13 +675,13 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC LOGV2_DEBUG(5277905, 2, "Create collection _createCollectionOnNonPrimaryShards", - "namespace"_attr = _nss); + "namespace"_attr = nss()); std::vector<AsyncRequestsSender::Request> requests; std::set<ShardId> initializedShards; auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId(); - NamespaceStringOrUUID nssOrUUID{_nss.db().toString(), *_collectionUUID}; + NamespaceStringOrUUID nssOrUUID{nss().db().toString(), *_collectionUUID}; auto [collOptions, indexes, idIndex] = getCollectionOptionsAndIndexes(opCtx, nssOrUUID); for (const auto& chunk : _initialChunks.chunks) { @@ -515,7 +691,7 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC continue; } - ShardsvrCreateCollectionParticipant createCollectionParticipantRequest(_nss); + ShardsvrCreateCollectionParticipant createCollectionParticipantRequest(nss()); createCollectionParticipantRequest.setCollectionUUID(*_collectionUUID); createCollectionParticipantRequest.setOptions(collOptions); @@ -532,7 +708,7 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC if (!requests.empty()) { auto responses = gatherResponses(opCtx, - _nss.db(), + nss().db(), ReadPreferenceSetting(ReadPreference::PrimaryOnly), Shard::RetryPolicy::kIdempotent, requests); @@ -542,57 +718,57 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC for (const auto& response : responses) { auto shardResponse = uassertStatusOKWithContext( std::move(response.swResponse), - str::stream() << "Unable to create collection " << _nss.ns() << " on " + str::stream() << "Unable to create collection " << nss().ns() << " on " << response.shardId); auto status = getStatusFromCommandResult(shardResponse.data); uassertStatusOK(status.withContext(str::stream() - << "Unable to create collection " << _nss.ns() + << "Unable to create collection " << nss().ns() << " on " << response.shardId)); auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data); uassertStatusOK(wcStatus.withContext(str::stream() - << "Unable to create collection " << _nss.ns() + << "Unable to create collection " << nss().ns() << " on " << response.shardId)); } } } void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { - LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = _nss); + LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = nss()); // Upsert Chunks. upsertChunks(opCtx, _initialChunks.chunks); - CollectionType coll(_nss, + CollectionType coll(nss(), _initialChunks.collVersion().epoch(), _initialChunks.creationTime, Date_t::now(), *_collectionUUID); - coll.setKeyPattern(_shardKeyPattern->toBSON()); + coll.setKeyPattern(_shardKeyPattern->getKeyPattern()); if (_collation) { coll.setDefaultCollation(_collation.value()); } - if (_request.getUnique()) { - coll.setUnique(*_request.getUnique()); + if (_doc.getUnique()) { + coll.setUnique(*_doc.getUnique()); } - updateCatalogEntry(opCtx, _nss, coll); + updateCatalogEntry(opCtx, nss(), coll); } -void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) { - LOGV2_DEBUG(5277907, 2, "Create collection _cleanup", "namespace"_attr = _nss); +void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) noexcept { + LOGV2_DEBUG(5277907, 2, "Create collection _finalize", "namespace"_attr = nss()); try { - forceShardFilteringMetadataRefresh(opCtx, _nss); + forceShardFilteringMetadataRefresh(opCtx, nss()); } catch (const DBException&) { // If the refresh fails, then set the shard version to UNKNOWN and let a future operation to // refresh the metadata. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, _nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx); + AutoGetCollection autoColl(opCtx, nss(), MODE_IX); + CollectionShardingRuntime::get(opCtx, nss())->clearFilteringMetadata(opCtx); } // Is it really necessary to refresh all shards? or can I assume that the shard version will be @@ -614,7 +790,7 @@ void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) { opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", - BSON("_flushRoutingTableCacheUpdates" << _nss.ns()), + BSON("_flushRoutingTableCacheUpdates" << nss().ns()), Seconds{30}, Shard::RetryPolicy::kIdempotent)); @@ -630,7 +806,7 @@ void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) { LOGV2(5277901, "Created initial chunk(s)", - "namespace"_attr = _nss, + "namespace"_attr = nss(), "numInitialChunks"_attr = _initialChunks.chunks.size(), "initialCollectionVersion"_attr = _initialChunks.collVersion()); @@ -638,7 +814,7 @@ void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) { ShardingLogging::get(opCtx)->logChange( opCtx, "shardCollection.end", - _nss.ns(), + nss().ns(), BSON("version" << _initialChunks.collVersion().toString() << "numChunks" << static_cast<int>(_initialChunks.chunks.size())), ShardingCatalogClient::kMajorityWriteConcern); @@ -649,4 +825,53 @@ void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) { _result = result; } +// Phase change and document handling API. +void CreateCollectionCoordinator::_insertCoordinatorDocument(CoordDoc&& doc) { + auto docBSON = _doc.toBSON(); + auto coorMetadata = doc.getShardingDDLCoordinatorMetadata(); + coorMetadata.setRecoveredFromDisk(true); + doc.setShardingDDLCoordinatorMetadata(coorMetadata); + + auto opCtx = cc().makeOperationContext(); + PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); + store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern); + _doc = std::move(doc); +} + +void CreateCollectionCoordinator::_updateCoordinatorDocument(CoordDoc&& newDoc) { + auto opCtx = cc().makeOperationContext(); + PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); + store.update(opCtx.get(), + BSON(CoordDoc::kIdFieldName << _doc.getId().toBSON()), + newDoc.toBSON(), + WriteConcerns::kMajorityWriteConcern); + + _doc = std::move(newDoc); +} + +void CreateCollectionCoordinator::_removeCoordinatorDocument() { + auto opCtx = cc().makeOperationContext(); + PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); + LOGV2_DEBUG(5458705, + 1, + "Removing state document for create collection coordinator", + "namespace"_attr = nss()); + store.remove(opCtx.get(), + BSON(CoordDoc::kIdFieldName << _doc.getId().toBSON()), + WriteConcerns::kMajorityWriteConcern); + + _doc = {}; +} + +void CreateCollectionCoordinator::_enterPhase(Phase newPhase) { + CoordDoc newDoc(_doc); + newDoc.setPhase(newPhase); + + if (_doc.getPhase() == Phase::kUnset) { + _insertCoordinatorDocument(std::move(newDoc)); + return; + } + _updateCoordinatorDocument(std::move(newDoc)); +} + } // namespace mongo diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h index a9177f276ee..d69b43252fb 100644 --- a/src/mongo/db/s/create_collection_coordinator.h +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -31,6 +31,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/s/config/initial_split_policy.h" +#include "mongo/db/s/create_collection_coordinator_document_gen.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_ddl_coordinator.h" #include "mongo/s/request_types/sharded_ddl_commands_gen.h" @@ -38,22 +39,56 @@ namespace mongo { -class CreateCollectionCoordinator final - : public ShardingDDLCoordinator_NORESILIENT, - public std::enable_shared_from_this<CreateCollectionCoordinator> { +class CreateCollectionCoordinator final : public ShardingDDLCoordinator { public: - CreateCollectionCoordinator(OperationContext* opCtx, const ShardsvrCreateCollection& request); + using CoordDoc = CreateCollectionCoordinatorDocument; + using Phase = CreateCollectionCoordinatorPhaseEnum; + + CreateCollectionCoordinator(const BSONObj& initialState); + ~CreateCollectionCoordinator() = default; + + + void checkIfOptionsConflict(const BSONObj& coorDoc) const override; + + boost::optional<BSONObj> reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override; /** - * Returns the information of the newly created collection, or the already existing one. It must - * be called after a successfull execution of run. + * Waits for the termination of the parent DDLCoordinator (so all the resources are liberated) + * and then return the */ - const CreateCollectionResponse& getResultOnSuccess() { + CreateCollectionResponse getResult(OperationContext* opCtx) { + getCompletionFuture().get(opCtx); + invariant(_result.is_initialized()); return *_result; } private: - SemiFuture<void> runImpl(std::shared_ptr<executor::TaskExecutor> executor) override; + ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept override; + + template <typename Func> + auto _executePhase(const Phase& newPhase, Func&& func) { + return [=] { + const auto& currPhase = _doc.getPhase(); + + if (currPhase > newPhase) { + // Do not execute this phase if we already reached a subsequent one. + return; + } + if (currPhase < newPhase) { + // Persist the new phase if this is the first time we are executing it. + _enterPhase(newPhase); + } + return func(); + }; + }; + + void _insertCoordinatorDocument(CoordDoc&& doc); + void _updateCoordinatorDocument(CoordDoc&& newStateDoc); + void _removeCoordinatorDocument(); + void _enterPhase(Phase newState); /** * Performs all required checks before holding the critical sections. @@ -68,7 +103,7 @@ private: /** * Given the appropiate split policy, create the initial chunks. */ - void _createChunks(OperationContext* opCtx); + void _createPolicyAndChunks(OperationContext* opCtx); /** * If the optimized path can be taken, ensure the collection is already created in all the @@ -86,12 +121,11 @@ private: /** * Refresh all participant shards and log creation. */ - void _cleanup(OperationContext* opCtx); + void _finalize(OperationContext* opCtx) noexcept; - ServiceContext* _serviceContext; - const ShardsvrCreateCollection _request; - const NamespaceString& _nss; + CreateCollectionCoordinatorDocument _doc; + // Objects generated on each execution. boost::optional<ShardKeyPattern> _shardKeyPattern; boost::optional<BSONObj> _collation; boost::optional<UUID> _collectionUUID; diff --git a/src/mongo/db/s/create_collection_coordinator_document.idl b/src/mongo/db/s/create_collection_coordinator_document.idl new file mode 100644 index 00000000000..c8c9aa05762 --- /dev/null +++ b/src/mongo/db/s/create_collection_coordinator_document.idl @@ -0,0 +1,97 @@ +# Copyright (C) 2021-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This file defines the format of documents stored in config.ddl.dropCollections, used by the +# shard coordinator to guarantee resilience in the event of stepdowns while creating collections. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + - "mongo/db/s/sharding_ddl_coordinator.idl" + - "mongo/s/request_types/sharded_ddl_commands.idl" + +enums: + CreateCollectionCoordinatorPhase: + description: "Current create collection coordinator's operation state." + type: string + values: + kUnset: "unset" + kCheck: "check" + kCommit: "commit" + +structs: + CreateCollectionCoordinatorDocument: + description: "Object with neccessary fields to create a collection" + generate_comparison_operators: false + strict: true + chained_structs: + ShardingDDLCoordinatorMetadata: ShardingDDLCoordinatorMetadata + fields: + phase: + type: CreateCollectionCoordinatorPhase + description: "Coordinator phase." + default: kUnset + shardKeyAlreadyCreated: + type: bool + description: >- + If set then the collection already had the shard key index created when the + command was first invoked. If a step down happens, then the shard index will + not be dropped on the next attempt at startup. + default: false + shardKey: + type: object_owned + description: "The index specification document to use as the shard key." + optional: true + unique: + type: bool + description: "Whether the shard key index should enforce a unique constraint." + optional: true + numInitialChunks: + type: safeInt64 + description: >- + The number of chunks to create initially when sharding an empty collection with + a hashed shard key. + optional: true + presplitHashedZones: + type: bool + description: >- + True if the chunks should be pre-split based on the existing zones when + sharding a collection with hashed shard key. + optional: true + initialSplitPoints: + type: array<object_owned> + description: >- + A specific set of points to create initial splits at, currently used only by + mapReduce. + optional: true + collation: + type: object_owned + description: "The collation to use for the shard key index." + optional: true
\ No newline at end of file diff --git a/src/mongo/db/s/shard_key_util.cpp b/src/mongo/db/s/shard_key_util.cpp index cb11f76831a..018164565ba 100644 --- a/src/mongo/db/s/shard_key_util.cpp +++ b/src/mongo/db/s/shard_key_util.cpp @@ -100,12 +100,12 @@ BSONObj makeCreateIndexesCmd(const NamespaceString& nss, } // namespace -void validateShardKeyIndexExistsOrCreateIfPossible(OperationContext* opCtx, - const NamespaceString& nss, - const ShardKeyPattern& shardKeyPattern, - const boost::optional<BSONObj>& defaultCollation, - bool unique, - const ShardKeyValidationBehaviors& behaviors) { +bool validShardKeyIndexExists(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const boost::optional<BSONObj>& defaultCollation, + bool unique, + const ShardKeyValidationBehaviors& behaviors) { auto indexes = behaviors.loadIndexes(nss); // 1. Verify consistency with existing unique indexes @@ -174,9 +174,23 @@ void validateShardKeyIndexExistsOrCreateIfPossible(OperationContext* opCtx, if (hasUsefulIndexForKey) { // Check 2.iii Make sure that there is a useful, non-multikey index available. behaviors.verifyUsefulNonMultiKeyIndex(nss, shardKeyPattern.toBSON()); + } + + return hasUsefulIndexForKey; +} + +void validateShardKeyIndexExistsOrCreateIfPossible(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const boost::optional<BSONObj>& defaultCollation, + bool unique, + const ShardKeyValidationBehaviors& behaviors) { + if (validShardKeyIndexExists( + opCtx, nss, shardKeyPattern, defaultCollation, unique, behaviors)) { return; } + // 4. If no useful index, verify we can create one. behaviors.verifyCanCreateShardKeyIndex(nss); diff --git a/src/mongo/db/s/shard_key_util.h b/src/mongo/db/s/shard_key_util.h index 619ced60915..6d7a41db58e 100644 --- a/src/mongo/db/s/shard_key_util.h +++ b/src/mongo/db/s/shard_key_util.h @@ -149,6 +149,19 @@ void validateShardKeyIndexExistsOrCreateIfPossible(OperationContext* opCtx, const boost::optional<BSONObj>& defaultCollation, bool unique, const ShardKeyValidationBehaviors& behaviors); - +/** + * Compares the proposed shard key with the collection's existing indexes to ensure they are a legal + * combination. + * + * Returns true if the shard key is valid and already exists. Steps 1, 2 and 3 of the previous + * function. + * + */ +bool validShardKeyIndexExists(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const boost::optional<BSONObj>& defaultCollation, + bool unique, + const ShardKeyValidationBehaviors& behaviors); } // namespace shardkeyutil } // namespace mongo diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index 02d006aea47..4d18a4c836f 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -47,7 +47,8 @@ ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONO } ShardingDDLCoordinator::ShardingDDLCoordinator(const BSONObj& coorDoc) - : _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)) {} + : _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)), + _recoveredFromDisk(_coorMetadata.getRecoveredFromDisk()) {} ShardingDDLCoordinator::~ShardingDDLCoordinator() { invariant(_constructionCompletionPromise.getFuture().isReady()); diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index d6e040ab33c..3ec6e42aab0 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -96,6 +96,7 @@ protected: }; ShardingDDLCoordinatorMetadata _coorMetadata; + bool _recoveredFromDisk; private: SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor, diff --git a/src/mongo/db/s/sharding_ddl_coordinator.idl b/src/mongo/db/s/sharding_ddl_coordinator.idl index e8c602ed370..3a28d9f2e67 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.idl +++ b/src/mongo/db/s/sharding_ddl_coordinator.idl @@ -45,6 +45,7 @@ enums: kDropDatabase: "dropDatabase" kDropCollection: "dropCollection" kRenameCollection: "renameCollection" + kCreateCollection: "createCollection" types: ForwardableOperationMetadata: diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp index b7ec805aeb9..259e0db4a4e 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp @@ -34,6 +34,7 @@ #include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/base/checked_cast.h" +#include "mongo/db/s/create_collection_coordinator.h" #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_ddl_coordinator.h" @@ -67,6 +68,8 @@ ShardingDDLCoordinatorService::constructInstance(BSONObj initialState) const { break; case DDLCoordinatorTypeEnum::kRenameCollection: return std::make_shared<RenameCollectionCoordinator>(std::move(initialState)); + case DDLCoordinatorTypeEnum::kCreateCollection: + return std::make_shared<CreateCollectionCoordinator>(std::move(initialState)); break; default: uasserted(ErrorCodes::BadValue, diff --git a/src/mongo/db/s/shardsvr_create_collection_command.cpp b/src/mongo/db/s/shardsvr_create_collection_command.cpp index ce2a381d2f5..b8d8d739747 100644 --- a/src/mongo/db/s/shardsvr_create_collection_command.cpp +++ b/src/mongo/db/s/shardsvr_create_collection_command.cpp @@ -38,6 +38,7 @@ #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/s/create_collection_coordinator.h" #include "mongo/db/s/shard_collection_legacy.h" +#include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" #include "mongo/s/grid.h" @@ -167,15 +168,25 @@ CreateCollectionResponse createCollection(OperationContext* opCtx, uassert( ErrorCodes::NotImplemented, "create collection not implemented yet", request.getShardKey()); - DistLockManager::ScopedDistLock dbDistLock(uassertStatusOK(DistLockManager::get(opCtx)->lock( - opCtx, nss.db(), "shardCollection", DistLockManager::kDefaultLockTimeout))); - DistLockManager::ScopedDistLock collDistLock(uassertStatusOK(DistLockManager::get(opCtx)->lock( - opCtx, nss.ns(), "shardCollection", DistLockManager::kDefaultLockTimeout))); + auto coordinatorDoc = CreateCollectionCoordinatorDocument(); + coordinatorDoc.setShardingDDLCoordinatorMetadata( + {{nss, DDLCoordinatorTypeEnum::kCreateCollection}}); + coordinatorDoc.setShardKey(request.getShardKey()); + if (request.getCollation()) + coordinatorDoc.setCollation(request.getCollation()); + if (request.getInitialSplitPoints()) + coordinatorDoc.setInitialSplitPoints(request.getInitialSplitPoints()); + if (request.getNumInitialChunks()) + coordinatorDoc.setNumInitialChunks(request.getNumInitialChunks()); + if (request.getPresplitHashedZones()) + coordinatorDoc.setPresplitHashedZones(request.getPresplitHashedZones()); + if (request.getUnique()) + coordinatorDoc.setUnique(request.getUnique()); - auto createCollectionCoordinator = - std::make_shared<CreateCollectionCoordinator>(opCtx, request); - createCollectionCoordinator->run(opCtx).get(opCtx); - return createCollectionCoordinator->getResultOnSuccess(); + auto service = ShardingDDLCoordinatorService::getService(opCtx); + auto createCollectionCoordinator = checked_pointer_cast<CreateCollectionCoordinator>( + service->getOrCreateInstance(opCtx, coordinatorDoc.toBSON())); + return createCollectionCoordinator->getResult(opCtx); } class ShardsvrCreateCollectionCommand final : public TypedCommand<ShardsvrCreateCollectionCommand> { |