summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSilvia Surroca <silvia.surroca@mongodb.com>2022-10-13 13:55:42 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-13 14:38:22 +0000
commitac9c95d0f90fb882aee1043e9aae6ff2491a7fe4 (patch)
tree721e97d46b986a4193dbe7138caeff02774e185d
parente30779f2b5bdcc9a5fff58515ec88dfd2545815a (diff)
downloadmongo-ac9c95d0f90fb882aee1043e9aae6ff2491a7fe4.tar.gz
SERVER-70237 Chunks merge commit must not create a BSON object too large
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h3
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp225
2 files changed, 126 insertions, 102 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 8cf63f2da69..ba379a0c75d 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -708,9 +708,10 @@ private:
void _mergeChunksInTransaction(OperationContext* opCtx,
const NamespaceString& nss,
const UUID& collectionUUID,
- const ChunkVersion& initialVersion,
const ChunkVersion& mergeVersion,
const boost::optional<Timestamp>& validAfter,
+ const ChunkRange& chunkRange,
+ const ShardId& shardId,
std::shared_ptr<std::vector<ChunkType>> chunksToMerge);
struct SplitChunkInTransactionResult {
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 55381ae4041..60fec373f51 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -128,22 +128,37 @@ StatusWith<ChunkType> findChunkContainingRange(OperationContext* opCtx,
ChunkType::parseFromConfigBSON(findResponseWith.getValue().docs.front(), epoch, timestamp));
}
-BSONObj buildCountChunksCommand(const std::vector<ChunkType>& chunks) {
+BSONObj buildCountChunksInRangeCommand(const UUID& collectionUUID,
+ const ShardId& shardId,
+ const ChunkRange& chunkRange) {
AggregateCommandRequest countRequest(ChunkType::ConfigNS);
BSONObjBuilder builder;
builder.append("aggregate", ChunkType::ConfigNS.ns());
- BSONArrayBuilder arrayBuilder;
- for (const auto& chunk : chunks) {
- auto query =
- BSON(ChunkType::min(chunk.getMin())
- << ChunkType::max(chunk.getMax()) << ChunkType::collectionUUID()
- << chunk.getCollectionUUID() << ChunkType::shard() << chunk.getShard().toString());
- arrayBuilder.append(query);
- }
+ BSONObjBuilder queryBuilder;
+ queryBuilder << ChunkType::collectionUUID << collectionUUID;
+ queryBuilder << ChunkType::shard(shardId.toString());
+ queryBuilder << ChunkType::min(BSON("$gte" << chunkRange.getMin()));
+ queryBuilder << ChunkType::min(BSON("$lt" << chunkRange.getMax()));
+
+ std::vector<BSONObj> pipeline;
+ pipeline.push_back(BSON("$match" << queryBuilder.obj()));
+ pipeline.push_back(BSON("$count" << ChunkType::collectionUUID.name()));
+ countRequest.setPipeline(pipeline);
+
+ return countRequest.toBSON({});
+}
+
+BSONObj buildCountSingleChunkCommand(const ChunkType& chunk) {
+ AggregateCommandRequest countRequest(ChunkType::ConfigNS);
+
+ auto query =
+ BSON(ChunkType::min(chunk.getMin())
+ << ChunkType::max(chunk.getMax()) << ChunkType::collectionUUID()
+ << chunk.getCollectionUUID() << ChunkType::shard() << chunk.getShard().toString());
std::vector<BSONObj> pipeline;
- pipeline.push_back(BSON("$match" << BSON("$or" << arrayBuilder.arr())));
+ pipeline.push_back(BSON("$match" << query));
pipeline.push_back(BSON("$count" << ChunkType::collectionUUID.name()));
countRequest.setPipeline(pipeline);
@@ -485,8 +500,10 @@ ShardingCatalogManager::_splitChunkInTransaction(OperationContext* opCtx,
sharedBlock->range,
sharedBlock->currentMaxVersion,
sharedBlock->shardName);
- std::vector<ChunkType> chunks{chunk};
- auto countRequest = buildCountChunksCommand(chunks);
+
+ // Verify that the range matches exactly a single chunk
+ auto countRequest = buildCountSingleChunkCommand(chunk);
+
return txnClient.runCommand(ChunkType::ConfigNS.db(), countRequest)
.thenRunOn(txnExec)
.then([&txnClient, sharedBlock](auto countResponse) {
@@ -724,101 +741,97 @@ void ShardingCatalogManager::_mergeChunksInTransaction(
OperationContext* opCtx,
const NamespaceString& nss,
const UUID& collectionUUID,
- const ChunkVersion& initialVersion,
const ChunkVersion& mergeVersion,
const boost::optional<Timestamp>& validAfter,
+ const ChunkRange& chunkRange,
+ const ShardId& shardId,
std::shared_ptr<std::vector<ChunkType>> chunksToMerge) {
dassert(validAfter);
- auto updateChunksFn = [chunksToMerge, mergeVersion, validAfter](
- const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
- // Check the merge chunk precondition, chunks must not have moved.
- auto countRequest = buildCountChunksCommand(*chunksToMerge);
- auto countBSONObjSize = countRequest.objsize();
- uassert(
- ErrorCodes::InvalidOptions,
- str::stream() << "Cannot merge such large range, the final BSON count operation size "
- << countBSONObjSize << " bytes, would exceed the maximum BSON size: "
- << BSONObjMaxInternalSize << " bytes",
- countBSONObjSize < BSONObjMaxInternalSize);
- return txnClient.runCommand(ChunkType::ConfigNS.db(), countRequest)
- .thenRunOn(txnExec)
- .then([&txnClient, chunksToMerge, mergeVersion, validAfter](auto commandResponse) {
- auto countResponse =
- uassertStatusOK(CursorResponse::parseFromBSON(commandResponse));
- uint64_t docCount = 0;
- auto firstBatch = countResponse.getBatch();
- if (!firstBatch.empty()) {
- auto countObj = firstBatch.front();
- docCount = countObj.getIntField(ChunkType::collectionUUID.name());
- uassert(
- ErrorCodes::BadValue, "Unexpected negative document count", docCount >= 0);
- }
- uassert(ErrorCodes::BadValue,
- str::stream() << "Could not meet precondition to execute merge, expected "
- << chunksToMerge->size() << " chunks, but found " << docCount,
- docCount == chunksToMerge->size());
-
- // Expand the first chunk into the newly merged chunks.
- write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS);
- updateOp.setUpdates({[&] {
- write_ops::UpdateOpEntry entry;
-
- ChunkType mergedChunk(chunksToMerge->front());
- entry.setQ(BSON(ChunkType::name(mergedChunk.getName())));
- mergedChunk.setMax(chunksToMerge->back().getMax());
+ auto updateChunksFn =
+ [chunksToMerge, collectionUUID, mergeVersion, validAfter, chunkRange, shardId](
+ const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ // Check the merge chunk precondition, chunks must not have moved.
+ auto countRequest = buildCountChunksInRangeCommand(collectionUUID, shardId, chunkRange);
+ return txnClient.runCommand(ChunkType::ConfigNS.db(), countRequest)
+ .thenRunOn(txnExec)
+ .then([&txnClient, chunksToMerge, mergeVersion, validAfter](auto commandResponse) {
+ auto countResponse =
+ uassertStatusOK(CursorResponse::parseFromBSON(commandResponse));
+ uint64_t docCount = 0;
+ auto firstBatch = countResponse.getBatch();
+ if (!firstBatch.empty()) {
+ auto countObj = firstBatch.front();
+ docCount = countObj.getIntField(ChunkType::collectionUUID.name());
+ }
+ uassert(ErrorCodes::BadValue,
+ str::stream()
+ << "Could not meet precondition to execute merge, expected "
+ << chunksToMerge->size() << " chunks, but found " << docCount,
+ docCount == chunksToMerge->size());
- // Fill in additional details for sending through transaction.
- mergedChunk.setVersion(mergeVersion);
- mergedChunk.setEstimatedSizeBytes(boost::none);
+ // Construct the new chunk by taking `min` from the first merged chunk and `max`
+ // from the last.
+ write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
- mergedChunk.setHistory(
- {ChunkHistory(validAfter.value(), mergedChunk.getShard())});
+ ChunkType mergedChunk(chunksToMerge->front());
+ entry.setQ(BSON(ChunkType::name(mergedChunk.getName())));
+ mergedChunk.setMax(chunksToMerge->back().getMax());
- entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(
- mergedChunk.toConfigBSON()));
- entry.setMulti(false);
+ // Fill in additional details for sending through transaction.
+ mergedChunk.setVersion(mergeVersion);
+ mergedChunk.setEstimatedSizeBytes(boost::none);
- return entry;
- }()});
+ mergedChunk.setHistory(
+ {ChunkHistory(validAfter.value(), mergedChunk.getShard())});
- return txnClient.runCRUDOp(updateOp, {});
- })
- .thenRunOn(txnExec)
- .then([&txnClient, chunksToMerge](auto chunkUpdateResponse) {
- uassertStatusOK(chunkUpdateResponse.toStatus());
-
- // Delete the rest of the chunks to be merged. Remember not to delete the first
- // chunk we're expanding.
- write_ops::DeleteCommandRequest deleteOp(ChunkType::ConfigNS);
- deleteOp.setDeletes([&] {
- std::vector<write_ops::DeleteOpEntry> deletes;
- for (size_t i = 1; i < chunksToMerge->size(); ++i) {
- write_ops::DeleteOpEntry entry;
- entry.setQ(BSON(ChunkType::name(chunksToMerge->at(i).getName())));
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(
+ mergedChunk.toConfigBSON()));
entry.setMulti(false);
- deletes.push_back(entry);
- }
- return deletes;
- }());
- auto deleteBSONObjSize = deleteOp.toBSON({}).objsize();
- uassert(ErrorCodes::InvalidOptions,
- str::stream()
- << "Cannot merge such large range, the final delete request size "
- << deleteBSONObjSize << " would exceed the maximum BSON size: "
- << BSONObjMaxInternalSize << " bytes",
- deleteBSONObjSize < BSONObjMaxInternalSize);
- return txnClient.runCRUDOp(deleteOp, {});
- })
- .thenRunOn(txnExec)
- .then([](auto removeChunkResponse) {
- uassertStatusOK(removeChunkResponse.toStatus());
-
- LOGV2_DEBUG(
- 6583805, 1, "Finished all transaction operations in merge chunk command");
- })
- .semi();
- };
+ return entry;
+ }()});
+
+ return txnClient.runCRUDOp(updateOp, {});
+ })
+ .thenRunOn(txnExec)
+ .then([&txnClient,
+ collectionUUID,
+ shardId,
+ // Delete the rest of the chunks to be merged. Remember not to delete the
+ // first chunk we're expanding.
+ chunkRangeToDelete =
+ ChunkRange(chunksToMerge->front().getMax(),
+ chunksToMerge->back().getMax())](auto chunkUpdateResponse) {
+ uassertStatusOK(chunkUpdateResponse.toStatus());
+
+ BSONObjBuilder queryBuilder;
+ queryBuilder << ChunkType::collectionUUID << collectionUUID;
+ queryBuilder << ChunkType::shard(shardId.toString());
+ queryBuilder << ChunkType::min(BSON("$gte" << chunkRangeToDelete.getMin()));
+ queryBuilder << ChunkType::min(BSON("$lt" << chunkRangeToDelete.getMax()));
+
+ write_ops::DeleteCommandRequest deleteOp(ChunkType::ConfigNS);
+ deleteOp.setDeletes([&] {
+ std::vector<write_ops::DeleteOpEntry> deletes;
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(queryBuilder.obj());
+ entry.setMulti(true);
+ return std::vector<write_ops::DeleteOpEntry>{entry};
+ }());
+
+ return txnClient.runCRUDOp(deleteOp, {});
+ })
+ .thenRunOn(txnExec)
+ .then([](auto removeChunkResponse) {
+ uassertStatusOK(removeChunkResponse.toStatus());
+
+ LOGV2_DEBUG(
+ 6583805, 1, "Finished all transaction operations in merge chunk command");
+ })
+ .semi();
+ };
txn_api::SyncTransactionWithRetries txn(
opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), nullptr);
@@ -950,19 +963,29 @@ ShardingCatalogManager::commitChunksMerge(OperationContext* opCtx,
// 4. apply the batch of updates to local metadata
_mergeChunksInTransaction(
- opCtx, nss, coll.getUuid(), initialVersion, mergeVersion, validAfter, chunksToMerge);
+ opCtx, nss, coll.getUuid(), mergeVersion, validAfter, chunkRange, shardId, chunksToMerge);
// 5. log changes
BSONObjBuilder logDetail;
{
+ initialVersion.serialize("prevShardVersion", &logDetail);
+ mergeVersion.serialize("mergedVersion", &logDetail);
+ logDetail.append("owningShard", shardId);
BSONArrayBuilder b(logDetail.subarrayStart("merged"));
+
+ // Pad some slack to avoid exceeding max BSON size
+ const auto kBSONObjMaxLogDetailSize = BSONObjMaxUserSize - 3 * 1024;
for (const auto& chunkToMerge : *chunksToMerge) {
- b.append(chunkToMerge.toConfigBSON());
+ auto chunkBSON = chunkToMerge.toConfigBSON();
+
+ // Truncate the log if BSON log size exceeds BSONObjMaxUserSize
+ if (logDetail.len() + chunkBSON.objsize() >= kBSONObjMaxLogDetailSize) {
+ logDetail.append("mergedChunksArrayTruncatedToDontExceedMaxBSONSize", true);
+ break;
+ }
+ b.append(chunkBSON);
}
}
- initialVersion.serialize("prevShardVersion", &logDetail);
- mergeVersion.serialize("mergedVersion", &logDetail);
- logDetail.append("owningShard", shardId);
ShardingLogging::get(opCtx)->logChange(
opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions());