summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2022-12-04 23:41:16 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-07 20:53:04 +0000
commit1372bfc84c1913fa4c6c2d882ac8bbc7763af123 (patch)
tree5c0b2637e616d958f73d4d0dc479a6943d1dc01a
parente3a200c3245d5b9c61b4cc70644cf426618a8d00 (diff)
downloadmongo-1372bfc84c1913fa4c6c2d882ac8bbc7763af123.tar.gz
SERVER-70237 Chunks merge commit must not create a BSON object too large
Non trivial backport of SERVER-70237 and part of SERVER-65838
-rw-r--r--src/mongo/db/query/get_executor.cpp6
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h12
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp194
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp25
4 files changed, 141 insertions, 96 deletions
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index db04d6a276a..4f5acb56c19 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -1662,9 +1662,9 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele
deleteStageParams->canonicalQuery = cq.get();
const bool batchDelete =
- (deleteStageParams->isMulti && !deleteStageParams->fromMigrate &&
- !deleteStageParams->returnDeleted && deleteStageParams->sort.isEmpty() &&
- !deleteStageParams->numStatsForDoc) &&
+ (deleteStageParams->isMulti && !opCtx->inMultiDocumentTransaction() &&
+ !deleteStageParams->fromMigrate && !deleteStageParams->returnDeleted &&
+ deleteStageParams->sort.isEmpty() && !deleteStageParams->numStatsForDoc) &&
((gInternalBatchUserMultiDeletesForTest.load() &&
nss.ns() == "__internalBatchedDeletesTesting.Collection0") ||
(batchDeletesByDefault.shouldFail()));
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 02f4ee3024a..44fefe25496 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -673,6 +673,18 @@ private:
*/
void _standardizeClusterParameters(OperationContext* opCtx, RemoteCommandTargeter* targeter);
+ /**
+ * Execute the merge chunk updates using the internal transaction API.
+ */
+ void _mergeChunksInTransaction(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& collectionUUID,
+ const ChunkVersion& mergeVersion,
+ const boost::optional<Timestamp>& validAfter,
+ const ChunkRange& chunkRange,
+ const ShardId& shardId,
+ std::shared_ptr<std::vector<ChunkType>> chunksToMerge);
+
// The owning service context
ServiceContext* const _serviceContext;
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 5b4d1dcc59d..76af2ce840a 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -89,76 +89,6 @@ void appendShortVersion(BufBuilder* out, const ChunkType& chunk) {
bb.done();
}
-BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& mergeVersion,
- const boost::optional<Timestamp>& validAfter) {
- BSONArrayBuilder updates;
-
- // Build an update operation to expand the first chunk into the newly merged chunk
- {
- BSONObjBuilder op;
- op.append("op", "u");
- op.appendBool("b", false); // no upsert
- op.append("ns", ChunkType::ConfigNS.ns());
-
- // expand first chunk into newly merged chunk
- ChunkType mergedChunk(chunksToMerge.front());
- mergedChunk.setMax(chunksToMerge.back().getMax());
-
- // fill in additional details for sending through transaction
- mergedChunk.setVersion(mergeVersion);
- mergedChunk.setEstimatedSizeBytes(boost::none);
-
- invariant(validAfter);
- mergedChunk.setHistory({ChunkHistory(validAfter.get(), mergedChunk.getShard())});
-
- // add the new chunk information as the update object
- op.append("o", mergedChunk.toConfigBSON());
-
- // query object
- op.append("o2", BSON(ChunkType::name(mergedChunk.getName())));
-
- updates.append(op.obj());
- }
-
- // Build update operations to delete the rest of the chunks to be merged. Remember not
- // to delete the first chunk we're expanding
- for (size_t i = 1; i < chunksToMerge.size(); ++i) {
- BSONObjBuilder op;
- op.append("op", "d");
- op.append("ns", ChunkType::ConfigNS.ns());
-
- op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName())));
-
- updates.append(op.obj());
- }
-
- return updates.arr();
-}
-
-BSONArray buildMergeChunksTransactionPrecond(const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& collVersion) {
- BSONArrayBuilder preCond;
-
- for (const auto& chunk : chunksToMerge) {
- BSONObj query = BSON(ChunkType::min(chunk.getMin())
- << ChunkType::max(chunk.getMax()) << ChunkType::collectionUUID()
- << chunk.getCollectionUUID());
-
- const auto collectionIdentityMatchCondition =
- BSON(ChunkType::collectionUUID()
- << chunk.getCollectionUUID() << ChunkType::shard(chunk.getShard().toString()));
-
- BSONObjBuilder b;
- b.append("ns", ChunkType::ConfigNS.ns());
- b.append("q", BSON("query" << query << "orderby" << BSON(ChunkType::lastmod() << -1)));
- b.append("res", collectionIdentityMatchCondition);
-
- preCond.append(b.obj());
- }
- return preCond.arr();
-}
-
/**
* Check that the chunk still exists and return its metadata.
*/
@@ -792,6 +722,83 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit(
return response.obj();
}
+void ShardingCatalogManager::_mergeChunksInTransaction(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& collectionUUID,
+ const ChunkVersion& mergeVersion,
+ const boost::optional<Timestamp>& validAfter,
+ const ChunkRange& chunkRange,
+ const ShardId& shardId,
+ std::shared_ptr<std::vector<ChunkType>> chunksToMerge) {
+ dassert(validAfter);
+ withTransaction(
+ opCtx, ChunkType::ConfigNS, [&, this](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Construct the new chunk by taking `min` from the first merged chunk and `max`
+ // from the last.
+ write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+
+ ChunkType mergedChunk(chunksToMerge->front());
+ entry.setQ(BSON(ChunkType::name(mergedChunk.getName())));
+ mergedChunk.setMax(chunksToMerge->back().getMax());
+
+ // Fill in additional details for sending through transaction.
+ mergedChunk.setVersion(mergeVersion);
+ mergedChunk.setEstimatedSizeBytes(boost::none);
+
+ mergedChunk.setHistory({ChunkHistory(validAfter.value(), mergedChunk.getShard())});
+
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(
+ mergedChunk.toConfigBSON()));
+ entry.setMulti(false);
+
+ return entry;
+ }()});
+
+ const auto updateRes =
+ writeToConfigDocumentInTxn(opCtx, ChunkType::ConfigNS, updateOp, txnNumber);
+
+ const auto numDocsModified = UpdateOp::parseResponse(updateRes).getN();
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unexpected number of modified documents during chunks merge "
+ "commit. Modified "
+ << numDocsModified << " documents instead of 1",
+ numDocsModified == 1);
+
+ // Delete the rest of the chunks to be merged.
+ // Remember not to delete the first chunk we're expanding.
+ BSONObjBuilder queryBuilder;
+ queryBuilder << ChunkType::collectionUUID << collectionUUID;
+ queryBuilder << ChunkType::shard(shardId.toString());
+ queryBuilder << ChunkType::min(BSON("$gte" << chunksToMerge->front().getMax()));
+ queryBuilder << ChunkType::min(BSON("$lt" << chunksToMerge->back().getMax()));
+
+ write_ops::DeleteCommandRequest deleteOp(ChunkType::ConfigNS);
+ deleteOp.setDeletes({[&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(queryBuilder.obj());
+ entry.setMulti(true);
+ return entry;
+ }()});
+
+ const auto deleteRes =
+ writeToConfigDocumentInTxn(opCtx, ChunkType::ConfigNS, deleteOp, txnNumber);
+
+ const auto numDocsDeleted = DeleteOp::parseResponse(deleteRes).getN();
+ const int expectedNumDocsDeleted = chunksToMerge->size() - 1;
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unexpected number of deleted documents during chunks merge "
+ "commit. Deleted "
+ << numDocsDeleted << " documents instead of "
+ << expectedNumDocsDeleted,
+ numDocsDeleted == expectedNumDocsDeleted);
+
+ LOGV2_DEBUG(6583805, 1, "Finished all transaction operations in merge chunk command");
+ });
+}
+
StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge(
OperationContext* opCtx,
const NamespaceString& nss,
@@ -883,11 +890,12 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge(
// 3. Prepare the data for the merge
// and ensure that the retrieved list of chunks covers the whole range.
- std::vector<ChunkType> chunksToMerge;
+ auto chunksToMerge = std::make_shared<std::vector<ChunkType>>();
+ chunksToMerge->reserve(shardChunksInRangeResponse.docs.size());
for (const auto& chunkDoc : shardChunksInRangeResponse.docs) {
auto chunk = uassertStatusOK(
ChunkType::parseFromConfigBSON(chunkDoc, coll.getEpoch(), coll.getTimestamp()));
- if (chunksToMerge.empty()) {
+ if (chunksToMerge->empty()) {
uassert(ErrorCodes::IllegalOperation,
str::stream()
<< "could not merge chunks, shard " << shardId
@@ -900,46 +908,46 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge(
<< "could not merge chunks, shard " << shardId
<< " does not contain a sequence of chunks that exactly fills the range "
<< chunkRange.toString(),
- chunk.getMin().woCompare(chunksToMerge.back().getMax()) == 0);
+ chunk.getMin().woCompare(chunksToMerge->back().getMax()) == 0);
}
- chunksToMerge.push_back(std::move(chunk));
+ chunksToMerge->push_back(std::move(chunk));
}
uassert(ErrorCodes::IllegalOperation,
str::stream() << "could not merge chunks, shard " << shardId
<< " does not contain a sequence of chunks that exactly fills the range "
<< chunkRange.toString(),
- !chunksToMerge.empty() &&
- chunksToMerge.back().getMax().woCompare(chunkRange.getMax()) == 0);
+ !chunksToMerge->empty() &&
+ chunksToMerge->back().getMax().woCompare(chunkRange.getMax()) == 0);
ChunkVersion initialVersion = collVersion;
ChunkVersion mergeVersion = initialVersion;
mergeVersion.incMinor();
- auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter);
- auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, initialVersion);
-
// 4. apply the batch of updates to local metadata
- uassertStatusOK(Grid::get(opCtx)->catalogClient()->applyChunkOpsDeprecated(
- opCtx,
- updates,
- preCond,
- coll.getUuid(),
- nss,
- mergeVersion,
- WriteConcernOptions(),
- repl::ReadConcernLevel::kLocalReadConcern));
+ _mergeChunksInTransaction(
+ opCtx, nss, coll.getUuid(), mergeVersion, validAfter, chunkRange, shardId, chunksToMerge);
// 5. log changes
BSONObjBuilder logDetail;
{
+ initialVersion.appendLegacyWithField(&logDetail, "prevShardVersion");
+ mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion");
+ logDetail.append("owningShard", shardId);
BSONArrayBuilder b(logDetail.subarrayStart("merged"));
- for (const auto& chunkToMerge : chunksToMerge) {
- b.append(chunkToMerge.toConfigBSON());
+
+ // Pad some slack to avoid exceeding max BSON size
+ const auto kBSONObjMaxLogDetailSize = BSONObjMaxUserSize - 3 * 1024;
+ for (const auto& chunkToMerge : *chunksToMerge) {
+ auto chunkBSON = chunkToMerge.toConfigBSON();
+
+ // Truncate the log if BSON log size exceeds BSONObjMaxUserSize
+ if (logDetail.len() + chunkBSON.objsize() >= kBSONObjMaxLogDetailSize) {
+ logDetail.append("mergedChunksArrayTruncatedToDontExceedMaxBSONSize", true);
+ break;
+ }
+ b.append(chunkBSON);
}
}
- initialVersion.appendLegacyWithField(&logDetail, "prevShardVersion");
- mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion");
- logDetail.append("owningShard", shardId);
ShardingLogging::get(opCtx)->logChange(
opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions());
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp
index d8a006c2286..d4a82abfb97 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp
@@ -30,9 +30,15 @@
#include "mongo/platform/basic.h"
#include "mongo/client/read_preference.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_cache_noop.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/read_write_concern_defaults.h"
+#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h"
#include "mongo/db/s/config/config_server_test_fixture.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/db/s/transaction_coordinator_service.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
@@ -44,18 +50,37 @@ using unittest::assertGet;
class MergeChunkTest : public ConfigServerTestFixture {
protected:
std::string _shardName = "shard0000";
+
void setUp() override {
ConfigServerTestFixture::setUp();
+
ShardType shard;
shard.setName(_shardName);
shard.setHost(_shardName + ":12");
setupShards({shard});
+
+ DBDirectClient client(operationContext());
+ client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
+ client.createCollection(CollectionType::ConfigNS.ns());
+
+ ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
+ LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
+ TransactionCoordinatorService::get(operationContext())
+ ->onShardingInitialization(operationContext(), true);
+ }
+
+ void tearDown() override {
+ TransactionCoordinatorService::get(operationContext())->onStepDown();
+ ConfigServerTestFixture::tearDown();
}
const ShardId _shardId{_shardName};
const NamespaceString _nss1{"TestDB.TestColl1"};
const NamespaceString _nss2{"TestDB.TestColl2"};
const KeyPattern _keyPattern{BSON("x" << 1)};
+ ReadWriteConcernDefaultsLookupMock _lookupMock;
};
TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) {