From ac9c95d0f90fb882aee1043e9aae6ff2491a7fe4 Mon Sep 17 00:00:00 2001 From: Silvia Surroca Date: Thu, 13 Oct 2022 13:55:42 +0000 Subject: SERVER-70237 Chunks merge commit must not create a BSON object too large --- src/mongo/db/s/config/sharding_catalog_manager.h | 3 +- .../sharding_catalog_manager_chunk_operations.cpp | 225 ++++++++++++--------- 2 files changed, 126 insertions(+), 102 deletions(-) diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 8cf63f2da69..ba379a0c75d 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -708,9 +708,10 @@ private: void _mergeChunksInTransaction(OperationContext* opCtx, const NamespaceString& nss, const UUID& collectionUUID, - const ChunkVersion& initialVersion, const ChunkVersion& mergeVersion, const boost::optional& validAfter, + const ChunkRange& chunkRange, + const ShardId& shardId, std::shared_ptr> chunksToMerge); struct SplitChunkInTransactionResult { diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 55381ae4041..60fec373f51 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -128,22 +128,37 @@ StatusWith findChunkContainingRange(OperationContext* opCtx, ChunkType::parseFromConfigBSON(findResponseWith.getValue().docs.front(), epoch, timestamp)); } -BSONObj buildCountChunksCommand(const std::vector& chunks) { +BSONObj buildCountChunksInRangeCommand(const UUID& collectionUUID, + const ShardId& shardId, + const ChunkRange& chunkRange) { AggregateCommandRequest countRequest(ChunkType::ConfigNS); BSONObjBuilder builder; builder.append("aggregate", ChunkType::ConfigNS.ns()); - BSONArrayBuilder arrayBuilder; - for (const auto& chunk : chunks) { - auto query = - BSON(ChunkType::min(chunk.getMin()) - << ChunkType::max(chunk.getMax()) << ChunkType::collectionUUID() - << chunk.getCollectionUUID() << ChunkType::shard() << chunk.getShard().toString()); - arrayBuilder.append(query); - } + BSONObjBuilder queryBuilder; + queryBuilder << ChunkType::collectionUUID << collectionUUID; + queryBuilder << ChunkType::shard(shardId.toString()); + queryBuilder << ChunkType::min(BSON("$gte" << chunkRange.getMin())); + queryBuilder << ChunkType::min(BSON("$lt" << chunkRange.getMax())); + + std::vector pipeline; + pipeline.push_back(BSON("$match" << queryBuilder.obj())); + pipeline.push_back(BSON("$count" << ChunkType::collectionUUID.name())); + countRequest.setPipeline(pipeline); + + return countRequest.toBSON({}); +} + +BSONObj buildCountSingleChunkCommand(const ChunkType& chunk) { + AggregateCommandRequest countRequest(ChunkType::ConfigNS); + + auto query = + BSON(ChunkType::min(chunk.getMin()) + << ChunkType::max(chunk.getMax()) << ChunkType::collectionUUID() + << chunk.getCollectionUUID() << ChunkType::shard() << chunk.getShard().toString()); std::vector pipeline; - pipeline.push_back(BSON("$match" << BSON("$or" << arrayBuilder.arr()))); + pipeline.push_back(BSON("$match" << query)); pipeline.push_back(BSON("$count" << ChunkType::collectionUUID.name())); countRequest.setPipeline(pipeline); @@ -485,8 +500,10 @@ ShardingCatalogManager::_splitChunkInTransaction(OperationContext* opCtx, sharedBlock->range, sharedBlock->currentMaxVersion, sharedBlock->shardName); - std::vector chunks{chunk}; - auto countRequest = buildCountChunksCommand(chunks); + + // Verify that the range matches exactly a single chunk + auto countRequest = buildCountSingleChunkCommand(chunk); + return txnClient.runCommand(ChunkType::ConfigNS.db(), countRequest) .thenRunOn(txnExec) .then([&txnClient, sharedBlock](auto countResponse) { @@ -724,101 +741,97 @@ void ShardingCatalogManager::_mergeChunksInTransaction( OperationContext* opCtx, const NamespaceString& nss, const UUID& collectionUUID, - const ChunkVersion& initialVersion, const ChunkVersion& mergeVersion, const boost::optional& validAfter, + const ChunkRange& chunkRange, + const ShardId& shardId, std::shared_ptr> chunksToMerge) { dassert(validAfter); - auto updateChunksFn = [chunksToMerge, mergeVersion, validAfter]( - const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - // Check the merge chunk precondition, chunks must not have moved. - auto countRequest = buildCountChunksCommand(*chunksToMerge); - auto countBSONObjSize = countRequest.objsize(); - uassert( - ErrorCodes::InvalidOptions, - str::stream() << "Cannot merge such large range, the final BSON count operation size " - << countBSONObjSize << " bytes, would exceed the maximum BSON size: " - << BSONObjMaxInternalSize << " bytes", - countBSONObjSize < BSONObjMaxInternalSize); - return txnClient.runCommand(ChunkType::ConfigNS.db(), countRequest) - .thenRunOn(txnExec) - .then([&txnClient, chunksToMerge, mergeVersion, validAfter](auto commandResponse) { - auto countResponse = - uassertStatusOK(CursorResponse::parseFromBSON(commandResponse)); - uint64_t docCount = 0; - auto firstBatch = countResponse.getBatch(); - if (!firstBatch.empty()) { - auto countObj = firstBatch.front(); - docCount = countObj.getIntField(ChunkType::collectionUUID.name()); - uassert( - ErrorCodes::BadValue, "Unexpected negative document count", docCount >= 0); - } - uassert(ErrorCodes::BadValue, - str::stream() << "Could not meet precondition to execute merge, expected " - << chunksToMerge->size() << " chunks, but found " << docCount, - docCount == chunksToMerge->size()); - - // Expand the first chunk into the newly merged chunks. - write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS); - updateOp.setUpdates({[&] { - write_ops::UpdateOpEntry entry; - - ChunkType mergedChunk(chunksToMerge->front()); - entry.setQ(BSON(ChunkType::name(mergedChunk.getName()))); - mergedChunk.setMax(chunksToMerge->back().getMax()); + auto updateChunksFn = + [chunksToMerge, collectionUUID, mergeVersion, validAfter, chunkRange, shardId]( + const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + // Check the merge chunk precondition, chunks must not have moved. + auto countRequest = buildCountChunksInRangeCommand(collectionUUID, shardId, chunkRange); + return txnClient.runCommand(ChunkType::ConfigNS.db(), countRequest) + .thenRunOn(txnExec) + .then([&txnClient, chunksToMerge, mergeVersion, validAfter](auto commandResponse) { + auto countResponse = + uassertStatusOK(CursorResponse::parseFromBSON(commandResponse)); + uint64_t docCount = 0; + auto firstBatch = countResponse.getBatch(); + if (!firstBatch.empty()) { + auto countObj = firstBatch.front(); + docCount = countObj.getIntField(ChunkType::collectionUUID.name()); + } + uassert(ErrorCodes::BadValue, + str::stream() + << "Could not meet precondition to execute merge, expected " + << chunksToMerge->size() << " chunks, but found " << docCount, + docCount == chunksToMerge->size()); - // Fill in additional details for sending through transaction. - mergedChunk.setVersion(mergeVersion); - mergedChunk.setEstimatedSizeBytes(boost::none); + // Construct the new chunk by taking `min` from the first merged chunk and `max` + // from the last. + write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; - mergedChunk.setHistory( - {ChunkHistory(validAfter.value(), mergedChunk.getShard())}); + ChunkType mergedChunk(chunksToMerge->front()); + entry.setQ(BSON(ChunkType::name(mergedChunk.getName()))); + mergedChunk.setMax(chunksToMerge->back().getMax()); - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( - mergedChunk.toConfigBSON())); - entry.setMulti(false); + // Fill in additional details for sending through transaction. + mergedChunk.setVersion(mergeVersion); + mergedChunk.setEstimatedSizeBytes(boost::none); - return entry; - }()}); + mergedChunk.setHistory( + {ChunkHistory(validAfter.value(), mergedChunk.getShard())}); - return txnClient.runCRUDOp(updateOp, {}); - }) - .thenRunOn(txnExec) - .then([&txnClient, chunksToMerge](auto chunkUpdateResponse) { - uassertStatusOK(chunkUpdateResponse.toStatus()); - - // Delete the rest of the chunks to be merged. Remember not to delete the first - // chunk we're expanding. - write_ops::DeleteCommandRequest deleteOp(ChunkType::ConfigNS); - deleteOp.setDeletes([&] { - std::vector deletes; - for (size_t i = 1; i < chunksToMerge->size(); ++i) { - write_ops::DeleteOpEntry entry; - entry.setQ(BSON(ChunkType::name(chunksToMerge->at(i).getName()))); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + mergedChunk.toConfigBSON())); entry.setMulti(false); - deletes.push_back(entry); - } - return deletes; - }()); - auto deleteBSONObjSize = deleteOp.toBSON({}).objsize(); - uassert(ErrorCodes::InvalidOptions, - str::stream() - << "Cannot merge such large range, the final delete request size " - << deleteBSONObjSize << " would exceed the maximum BSON size: " - << BSONObjMaxInternalSize << " bytes", - deleteBSONObjSize < BSONObjMaxInternalSize); - return txnClient.runCRUDOp(deleteOp, {}); - }) - .thenRunOn(txnExec) - .then([](auto removeChunkResponse) { - uassertStatusOK(removeChunkResponse.toStatus()); - - LOGV2_DEBUG( - 6583805, 1, "Finished all transaction operations in merge chunk command"); - }) - .semi(); - }; + return entry; + }()}); + + return txnClient.runCRUDOp(updateOp, {}); + }) + .thenRunOn(txnExec) + .then([&txnClient, + collectionUUID, + shardId, + // Delete the rest of the chunks to be merged. Remember not to delete the + // first chunk we're expanding. + chunkRangeToDelete = + ChunkRange(chunksToMerge->front().getMax(), + chunksToMerge->back().getMax())](auto chunkUpdateResponse) { + uassertStatusOK(chunkUpdateResponse.toStatus()); + + BSONObjBuilder queryBuilder; + queryBuilder << ChunkType::collectionUUID << collectionUUID; + queryBuilder << ChunkType::shard(shardId.toString()); + queryBuilder << ChunkType::min(BSON("$gte" << chunkRangeToDelete.getMin())); + queryBuilder << ChunkType::min(BSON("$lt" << chunkRangeToDelete.getMax())); + + write_ops::DeleteCommandRequest deleteOp(ChunkType::ConfigNS); + deleteOp.setDeletes([&] { + std::vector deletes; + write_ops::DeleteOpEntry entry; + entry.setQ(queryBuilder.obj()); + entry.setMulti(true); + return std::vector{entry}; + }()); + + return txnClient.runCRUDOp(deleteOp, {}); + }) + .thenRunOn(txnExec) + .then([](auto removeChunkResponse) { + uassertStatusOK(removeChunkResponse.toStatus()); + + LOGV2_DEBUG( + 6583805, 1, "Finished all transaction operations in merge chunk command"); + }) + .semi(); + }; txn_api::SyncTransactionWithRetries txn( opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), nullptr); @@ -950,19 +963,29 @@ ShardingCatalogManager::commitChunksMerge(OperationContext* opCtx, // 4. apply the batch of updates to local metadata _mergeChunksInTransaction( - opCtx, nss, coll.getUuid(), initialVersion, mergeVersion, validAfter, chunksToMerge); + opCtx, nss, coll.getUuid(), mergeVersion, validAfter, chunkRange, shardId, chunksToMerge); // 5. log changes BSONObjBuilder logDetail; { + initialVersion.serialize("prevShardVersion", &logDetail); + mergeVersion.serialize("mergedVersion", &logDetail); + logDetail.append("owningShard", shardId); BSONArrayBuilder b(logDetail.subarrayStart("merged")); + + // Pad some slack to avoid exceeding max BSON size + const auto kBSONObjMaxLogDetailSize = BSONObjMaxUserSize - 3 * 1024; for (const auto& chunkToMerge : *chunksToMerge) { - b.append(chunkToMerge.toConfigBSON()); + auto chunkBSON = chunkToMerge.toConfigBSON(); + + // Truncate the log if BSON log size exceeds BSONObjMaxUserSize + if (logDetail.len() + chunkBSON.objsize() >= kBSONObjMaxLogDetailSize) { + logDetail.append("mergedChunksArrayTruncatedToDontExceedMaxBSONSize", true); + break; + } + b.append(chunkBSON); } } - initialVersion.serialize("prevShardVersion", &logDetail); - mergeVersion.serialize("mergedVersion", &logDetail); - logDetail.append("owningShard", shardId); ShardingLogging::get(opCtx)->logChange( opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions()); -- cgit v1.2.1