diff options
author | Tommaso Tocci <tommaso.tocci@mongodb.com> | 2022-12-04 23:41:16 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-07 20:53:04 +0000 |
commit | 1372bfc84c1913fa4c6c2d882ac8bbc7763af123 (patch) | |
tree | 5c0b2637e616d958f73d4d0dc479a6943d1dc01a | |
parent | e3a200c3245d5b9c61b4cc70644cf426618a8d00 (diff) | |
download | mongo-1372bfc84c1913fa4c6c2d882ac8bbc7763af123.tar.gz |
SERVER-70237 Chunks merge commit must not create a BSON object too large
Non trivial backport of SERVER-70237 and part of SERVER-65838
4 files changed, 141 insertions, 96 deletions
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index db04d6a276a..4f5acb56c19 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -1662,9 +1662,9 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele deleteStageParams->canonicalQuery = cq.get(); const bool batchDelete = - (deleteStageParams->isMulti && !deleteStageParams->fromMigrate && - !deleteStageParams->returnDeleted && deleteStageParams->sort.isEmpty() && - !deleteStageParams->numStatsForDoc) && + (deleteStageParams->isMulti && !opCtx->inMultiDocumentTransaction() && + !deleteStageParams->fromMigrate && !deleteStageParams->returnDeleted && + deleteStageParams->sort.isEmpty() && !deleteStageParams->numStatsForDoc) && ((gInternalBatchUserMultiDeletesForTest.load() && nss.ns() == "__internalBatchedDeletesTesting.Collection0") || (batchDeletesByDefault.shouldFail())); diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 02f4ee3024a..44fefe25496 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -673,6 +673,18 @@ private: */ void _standardizeClusterParameters(OperationContext* opCtx, RemoteCommandTargeter* targeter); + /** + * Execute the merge chunk updates using the internal transaction API. + */ + void _mergeChunksInTransaction(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& collectionUUID, + const ChunkVersion& mergeVersion, + const boost::optional<Timestamp>& validAfter, + const ChunkRange& chunkRange, + const ShardId& shardId, + std::shared_ptr<std::vector<ChunkType>> chunksToMerge); + // The owning service context ServiceContext* const _serviceContext; diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 5b4d1dcc59d..76af2ce840a 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -89,76 +89,6 @@ void appendShortVersion(BufBuilder* out, const ChunkType& chunk) { bb.done(); } -BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& mergeVersion, - const boost::optional<Timestamp>& validAfter) { - BSONArrayBuilder updates; - - // Build an update operation to expand the first chunk into the newly merged chunk - { - BSONObjBuilder op; - op.append("op", "u"); - op.appendBool("b", false); // no upsert - op.append("ns", ChunkType::ConfigNS.ns()); - - // expand first chunk into newly merged chunk - ChunkType mergedChunk(chunksToMerge.front()); - mergedChunk.setMax(chunksToMerge.back().getMax()); - - // fill in additional details for sending through transaction - mergedChunk.setVersion(mergeVersion); - mergedChunk.setEstimatedSizeBytes(boost::none); - - invariant(validAfter); - mergedChunk.setHistory({ChunkHistory(validAfter.get(), mergedChunk.getShard())}); - - // add the new chunk information as the update object - op.append("o", mergedChunk.toConfigBSON()); - - // query object - op.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); - - updates.append(op.obj()); - } - - // Build update operations to delete the rest of the chunks to be merged. Remember not - // to delete the first chunk we're expanding - for (size_t i = 1; i < chunksToMerge.size(); ++i) { - BSONObjBuilder op; - op.append("op", "d"); - op.append("ns", ChunkType::ConfigNS.ns()); - - op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName()))); - - updates.append(op.obj()); - } - - return updates.arr(); -} - -BSONArray buildMergeChunksTransactionPrecond(const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& collVersion) { - BSONArrayBuilder preCond; - - for (const auto& chunk : chunksToMerge) { - BSONObj query = BSON(ChunkType::min(chunk.getMin()) - << ChunkType::max(chunk.getMax()) << ChunkType::collectionUUID() - << chunk.getCollectionUUID()); - - const auto collectionIdentityMatchCondition = - BSON(ChunkType::collectionUUID() - << chunk.getCollectionUUID() << ChunkType::shard(chunk.getShard().toString())); - - BSONObjBuilder b; - b.append("ns", ChunkType::ConfigNS.ns()); - b.append("q", BSON("query" << query << "orderby" << BSON(ChunkType::lastmod() << -1))); - b.append("res", collectionIdentityMatchCondition); - - preCond.append(b.obj()); - } - return preCond.arr(); -} - /** * Check that the chunk still exists and return its metadata. */ @@ -792,6 +722,83 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit( return response.obj(); } +void ShardingCatalogManager::_mergeChunksInTransaction( + OperationContext* opCtx, + const NamespaceString& nss, + const UUID& collectionUUID, + const ChunkVersion& mergeVersion, + const boost::optional<Timestamp>& validAfter, + const ChunkRange& chunkRange, + const ShardId& shardId, + std::shared_ptr<std::vector<ChunkType>> chunksToMerge) { + dassert(validAfter); + withTransaction( + opCtx, ChunkType::ConfigNS, [&, this](OperationContext* opCtx, TxnNumber txnNumber) { + // Construct the new chunk by taking `min` from the first merged chunk and `max` + // from the last. + write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + + ChunkType mergedChunk(chunksToMerge->front()); + entry.setQ(BSON(ChunkType::name(mergedChunk.getName()))); + mergedChunk.setMax(chunksToMerge->back().getMax()); + + // Fill in additional details for sending through transaction. + mergedChunk.setVersion(mergeVersion); + mergedChunk.setEstimatedSizeBytes(boost::none); + + mergedChunk.setHistory({ChunkHistory(validAfter.value(), mergedChunk.getShard())}); + + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + mergedChunk.toConfigBSON())); + entry.setMulti(false); + + return entry; + }()}); + + const auto updateRes = + writeToConfigDocumentInTxn(opCtx, ChunkType::ConfigNS, updateOp, txnNumber); + + const auto numDocsModified = UpdateOp::parseResponse(updateRes).getN(); + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unexpected number of modified documents during chunks merge " + "commit. Modified " + << numDocsModified << " documents instead of 1", + numDocsModified == 1); + + // Delete the rest of the chunks to be merged. + // Remember not to delete the first chunk we're expanding. + BSONObjBuilder queryBuilder; + queryBuilder << ChunkType::collectionUUID << collectionUUID; + queryBuilder << ChunkType::shard(shardId.toString()); + queryBuilder << ChunkType::min(BSON("$gte" << chunksToMerge->front().getMax())); + queryBuilder << ChunkType::min(BSON("$lt" << chunksToMerge->back().getMax())); + + write_ops::DeleteCommandRequest deleteOp(ChunkType::ConfigNS); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(queryBuilder.obj()); + entry.setMulti(true); + return entry; + }()}); + + const auto deleteRes = + writeToConfigDocumentInTxn(opCtx, ChunkType::ConfigNS, deleteOp, txnNumber); + + const auto numDocsDeleted = DeleteOp::parseResponse(deleteRes).getN(); + const int expectedNumDocsDeleted = chunksToMerge->size() - 1; + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unexpected number of deleted documents during chunks merge " + "commit. Deleted " + << numDocsDeleted << " documents instead of " + << expectedNumDocsDeleted, + numDocsDeleted == expectedNumDocsDeleted); + + LOGV2_DEBUG(6583805, 1, "Finished all transaction operations in merge chunk command"); + }); +} + StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge( OperationContext* opCtx, const NamespaceString& nss, @@ -883,11 +890,12 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge( // 3. Prepare the data for the merge // and ensure that the retrieved list of chunks covers the whole range. - std::vector<ChunkType> chunksToMerge; + auto chunksToMerge = std::make_shared<std::vector<ChunkType>>(); + chunksToMerge->reserve(shardChunksInRangeResponse.docs.size()); for (const auto& chunkDoc : shardChunksInRangeResponse.docs) { auto chunk = uassertStatusOK( ChunkType::parseFromConfigBSON(chunkDoc, coll.getEpoch(), coll.getTimestamp())); - if (chunksToMerge.empty()) { + if (chunksToMerge->empty()) { uassert(ErrorCodes::IllegalOperation, str::stream() << "could not merge chunks, shard " << shardId @@ -900,46 +908,46 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge( << "could not merge chunks, shard " << shardId << " does not contain a sequence of chunks that exactly fills the range " << chunkRange.toString(), - chunk.getMin().woCompare(chunksToMerge.back().getMax()) == 0); + chunk.getMin().woCompare(chunksToMerge->back().getMax()) == 0); } - chunksToMerge.push_back(std::move(chunk)); + chunksToMerge->push_back(std::move(chunk)); } uassert(ErrorCodes::IllegalOperation, str::stream() << "could not merge chunks, shard " << shardId << " does not contain a sequence of chunks that exactly fills the range " << chunkRange.toString(), - !chunksToMerge.empty() && - chunksToMerge.back().getMax().woCompare(chunkRange.getMax()) == 0); + !chunksToMerge->empty() && + chunksToMerge->back().getMax().woCompare(chunkRange.getMax()) == 0); ChunkVersion initialVersion = collVersion; ChunkVersion mergeVersion = initialVersion; mergeVersion.incMinor(); - auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter); - auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, initialVersion); - // 4. apply the batch of updates to local metadata - uassertStatusOK(Grid::get(opCtx)->catalogClient()->applyChunkOpsDeprecated( - opCtx, - updates, - preCond, - coll.getUuid(), - nss, - mergeVersion, - WriteConcernOptions(), - repl::ReadConcernLevel::kLocalReadConcern)); + _mergeChunksInTransaction( + opCtx, nss, coll.getUuid(), mergeVersion, validAfter, chunkRange, shardId, chunksToMerge); // 5. log changes BSONObjBuilder logDetail; { + initialVersion.appendLegacyWithField(&logDetail, "prevShardVersion"); + mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion"); + logDetail.append("owningShard", shardId); BSONArrayBuilder b(logDetail.subarrayStart("merged")); - for (const auto& chunkToMerge : chunksToMerge) { - b.append(chunkToMerge.toConfigBSON()); + + // Pad some slack to avoid exceeding max BSON size + const auto kBSONObjMaxLogDetailSize = BSONObjMaxUserSize - 3 * 1024; + for (const auto& chunkToMerge : *chunksToMerge) { + auto chunkBSON = chunkToMerge.toConfigBSON(); + + // Truncate the log if BSON log size exceeds BSONObjMaxUserSize + if (logDetail.len() + chunkBSON.objsize() >= kBSONObjMaxLogDetailSize) { + logDetail.append("mergedChunksArrayTruncatedToDontExceedMaxBSONSize", true); + break; + } + b.append(chunkBSON); } } - initialVersion.appendLegacyWithField(&logDetail, "prevShardVersion"); - mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion"); - logDetail.append("owningShard", shardId); ShardingLogging::get(opCtx)->logChange( opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions()); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp index d8a006c2286..d4a82abfb97 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp @@ -30,9 +30,15 @@ #include "mongo/platform/basic.h" #include "mongo/client/read_preference.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_cache_noop.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/read_write_concern_defaults.h" +#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h" #include "mongo/db/s/config/config_server_test_fixture.h" #include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/transaction_coordinator_service.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" @@ -44,18 +50,37 @@ using unittest::assertGet; class MergeChunkTest : public ConfigServerTestFixture { protected: std::string _shardName = "shard0000"; + void setUp() override { ConfigServerTestFixture::setUp(); + ShardType shard; shard.setName(_shardName); shard.setHost(_shardName + ":12"); setupShards({shard}); + + DBDirectClient client(operationContext()); + client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); + client.createCollection(CollectionType::ConfigNS.ns()); + + ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn()); + LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); + TransactionCoordinatorService::get(operationContext()) + ->onShardingInitialization(operationContext(), true); + } + + void tearDown() override { + TransactionCoordinatorService::get(operationContext())->onStepDown(); + ConfigServerTestFixture::tearDown(); } const ShardId _shardId{_shardName}; const NamespaceString _nss1{"TestDB.TestColl1"}; const NamespaceString _nss2{"TestDB.TestColl2"}; const KeyPattern _keyPattern{BSON("x" << 1)}; + ReadWriteConcernDefaultsLookupMock _lookupMock; }; TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) { |