diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2022-04-07 07:11:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-07 08:03:37 +0000 |
commit | c257704b483fb3938465f5eb4f2a01e1d1e7c119 (patch) | |
tree | 75bd1d24a5a819d2d53b8c372d547e9200c01484 /src/mongo/db/s/create_collection_coordinator.cpp | |
parent | 487babfa93b6f6f2fb500ecb5411045be89e881c (diff) | |
download | mongo-c257704b483fb3938465f5eb4f2a01e1d1e7c119.tar.gz |
SERVER-64822 Clear filtering metadata if createCollection commit fails
Diffstat (limited to 'src/mongo/db/s/create_collection_coordinator.cpp')
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 134 |
1 files changed, 57 insertions, 77 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index b190f458c47..d3ec9d03230 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -315,52 +315,27 @@ void insertChunks(OperationContext* opCtx, } } -void updateCatalogEntry(OperationContext* opCtx, - const NamespaceString& nss, - CollectionType& coll, - const OperationSessionInfo& osi) { +void insertCollectionEntry(OperationContext* opCtx, + const NamespaceString& nss, + CollectionType& coll, + const OperationSessionInfo& osi) { const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - BatchedCommandRequest updateRequest([&]() { - write_ops::UpdateCommandRequest updateOp(CollectionType::ConfigNS); - updateOp.setUpdates({[&] { - write_ops::UpdateOpEntry entry; - entry.setQ(BSON(CollectionType::kNssFieldName << nss.ns())); - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(coll.toBSON())); - entry.setUpsert(true); - entry.setMulti(false); - return entry; - }()}); - return updateOp; - }()); - - updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON()); + BatchedCommandRequest insertRequest( + write_ops::InsertCommandRequest(CollectionType::ConfigNS, {coll.toBSON()})); + insertRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - try { - BatchedCommandResponse batchResponse; - const auto response = - configShard->runCommand(opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - CollectionType::ConfigNS.db().toString(), - cmdObj, - Shard::kDefaultConfigCommandTimeout, - Shard::RetryPolicy::kIdempotent); - - const auto writeStatus = - Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); - - uassertStatusOK(batchResponse.toStatus()); - uassertStatusOK(writeStatus); - } catch (const DBException&) { - // If an error happens when contacting the config server, we don't know if the update - // succeeded or not, which might cause the local shard version to differ from the config - // server, so we clear the metadata to allow another operation to refresh it. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx); - throw; - } + const BSONObj cmdObj = insertRequest.toBSON().addFields(osi.toBSON()); + + BatchedCommandResponse unusedResponse; + uassertStatusOK(Shard::CommandResponse::processBatchWriteResponse( + configShard->runCommand(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + CollectionType::ConfigNS.db().toString(), + cmdObj, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent), + &unusedResponse)); } void broadcastDropCollection(OperationContext* opCtx, @@ -379,7 +354,6 @@ void broadcastDropCollection(OperationContext* opCtx, opCtx, nss, participants, executor, osi); } - /** * This function writes a no-op oplog entry on shardCollection event. */ @@ -387,8 +361,6 @@ void _writeOplogMessage(OperationContext* opCtx, const NamespaceString& nss, const UUID& uuid, const BSONObj cmd) { - - BSONObjBuilder cmdBuilder; cmdBuilder.append("shardCollection", nss.ns()); cmdBuilder.appendElements(cmd); @@ -417,6 +389,7 @@ void _writeOplogMessage(OperationContext* opCtx, wunit.commit(); }); } + } // namespace CreateCollectionCoordinator::CreateCollectionCoordinator(ShardingDDLCoordinatorService* service, @@ -507,16 +480,19 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( _shardKeyPattern->getKeyPattern().toBSON(), getCollation(opCtx, nss(), _doc.getCollation()).second, _doc.getUnique().value_or(false))) { - _result = createCollectionResponseOpt; _checkCollectionUUIDMismatch(opCtx); - // The collection was already created and commited but there was a - // stepdown after the commit. + + // The critical section can still be held here if the node committed the + // sharding of the collection but then it stepped down before it managed to + // delete the coordinator document RecoverableCriticalSectionService::get(opCtx) ->releaseRecoverableCriticalSection( opCtx, nss(), _getCriticalSectionReason(), ShardingCatalogClient::kMajorityWriteConcern); + + _result = createCollectionResponseOpt; return; } @@ -561,10 +537,10 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( if (_splitPolicy->isOptimized()) { _createChunks(opCtx); - // Block reads/writes from here on if we need to create - // the collection on other shards, this way we prevent - // reads/writes that should be redirected to another - // shard. + + // Block reads/writes from here on if we need to create the collection on other + // shards, this way we prevent reads/writes that should be redirected to another + // shard RecoverableCriticalSectionService::get(opCtx) ->promoteRecoverableCriticalSectionToBlockAlsoReads( opCtx, @@ -597,11 +573,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // collections. if (!_splitPolicy->isOptimized()) { _createChunks(opCtx); - _commit(opCtx); } - - _finalize(opCtx); })) .then([this] { auto opCtxHolder = cc().makeOperationContext(); @@ -824,9 +797,8 @@ void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) { opCtx, *_shardKeyPattern, {*_collectionUUID, ShardingState::get(opCtx)->shardId()}); // There must be at least one chunk. - invariant(!_initialChunks.chunks.empty()); - - _numChunks = _initialChunks.chunks.size(); + invariant(_initialChunks); + invariant(!_initialChunks->chunks.empty()); } void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards( @@ -843,7 +815,7 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards( NamespaceStringOrUUID nssOrUUID{nss().db().toString(), *_collectionUUID}; auto [collOptions, indexes, idIndex] = getCollectionOptionsAndIndexes(opCtx, nssOrUUID); - for (const auto& chunk : _initialChunks.chunks) { + for (const auto& chunk : _initialChunks->chunks) { const auto& chunkShardId = chunk.getShard(); if (chunkShardId == dbPrimaryShardId || initializedShards.find(chunkShardId) != initializedShards.end()) { @@ -897,11 +869,11 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { // Upsert Chunks. _doc = _updateSession(opCtx, _doc); - insertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc)); + insertChunks(opCtx, _initialChunks->chunks, getCurrentSession(_doc)); CollectionType coll(nss(), - _initialChunks.collVersion().epoch(), - _initialChunks.collVersion().getTimestamp(), + _initialChunks->collVersion().epoch(), + _initialChunks->collVersion().getTimestamp(), Date_t::now(), *_collectionUUID, _shardKeyPattern->getKeyPattern()); @@ -921,21 +893,28 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { } _doc = _updateSession(opCtx, _doc); - updateCatalogEntry(opCtx, nss(), coll, getCurrentSession(_doc)); - _writeOplogMessage(opCtx, nss(), *_collectionUUID, _doc.getCreateCollectionRequest().toBSON()); -} + try { + insertCollectionEntry(opCtx, nss(), coll, getCurrentSession(_doc)); -void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) { - LOGV2_DEBUG(5277907, 2, "Create collection _finalize", "namespace"_attr = nss()); + _writeOplogMessage( + opCtx, nss(), *_collectionUUID, _doc.getCreateCollectionRequest().toBSON()); + + LOGV2_DEBUG(5277907, 2, "Collection successfully committed", "namespace"_attr = nss()); - try { forceShardFilteringMetadataRefresh(opCtx, nss()); - } catch (const DBException&) { + } catch (const DBException& ex) { + LOGV2(5277908, + "Failed to obtain collection's shard version, so it will be recovered", + "namespace"_attr = nss(), + "error"_attr = redact(ex)); + // If the refresh fails, then set the shard version to UNKNOWN and let a future operation to // refresh the metadata. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, nss(), MODE_IX); CollectionShardingRuntime::get(opCtx, nss())->clearFilteringMetadata(opCtx); + + throw; } // Best effort refresh to warm up cache of all involved shards so we can have a cluster ready to @@ -944,8 +923,9 @@ void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) { auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId(); std::set<ShardId> shardsRefreshed; - for (const auto& chunk : _initialChunks.chunks) { + for (const auto& chunk : _initialChunks->chunks) { const auto& chunkShardId = chunk.getShard(); + if (chunkShardId == dbPrimaryShardId || shardsRefreshed.find(chunkShardId) != shardsRefreshed.end()) { continue; @@ -963,11 +943,10 @@ void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) { LOGV2(5277901, "Created initial chunk(s)", "namespace"_attr = nss(), - "numInitialChunks"_attr = _initialChunks.chunks.size(), - "initialCollectionVersion"_attr = _initialChunks.collVersion()); + "numInitialChunks"_attr = _initialChunks->chunks.size(), + "initialCollectionVersion"_attr = _initialChunks->collVersion()); - auto result = CreateCollectionResponse( - _initialChunks.chunks[_initialChunks.chunks.size() - 1].getVersion()); + auto result = CreateCollectionResponse(_initialChunks->chunks.back().getVersion()); result.setCollectionUUID(_collectionUUID); _result = std::move(result); @@ -993,8 +972,9 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt collectionDetail.append("version", _result->getCollectionVersion().toString()); if (_collectionEmpty) collectionDetail.append("empty", *_collectionEmpty); - if (_numChunks) - collectionDetail.appendNumber("numChunks", static_cast<long long>(*_numChunks)); + if (_initialChunks) + collectionDetail.appendNumber("numChunks", + static_cast<long long>(_initialChunks->chunks.size())); ShardingLogging::get(opCtx)->logChange( opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj()); } |