diff options
author | Allison Easton <allison.easton@mongodb.com> | 2021-07-06 08:05:59 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-22 07:53:38 +0000 |
commit | 57c5085d7524540fed7aa83d5e3f149d27c355fd (patch) | |
tree | 935b5bacaf060a74f0011c364df42d122167af88 | |
parent | cd16c8abae3e6afc7bd4a8257a4359df5b4fdd64 (diff) | |
download | mongo-57c5085d7524540fed7aa83d5e3f149d27c355fd.tar.gz |
SERVER-58242 Increase the batch size for '_xferMods'
3 files changed, 161 insertions, 12 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 83273aa3d2a..489988cc458 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -750,13 +750,14 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, long long totalDocSize = xferMods(&arrDel, &deleteList, 0, noopFn); arrDel.done(); - BSONArrayBuilder arrUpd(builder->subarrayStart("reload")); - auto findByIdWrapper = [opCtx, db, ns](BSONObj idDoc, BSONObj* fullDoc) { - return Helpers::findById(opCtx, db, ns, idDoc, *fullDoc); - }; - totalDocSize = xferMods(&arrUpd, &updateList, totalDocSize, findByIdWrapper); - arrUpd.done(); - + if (deleteList.empty()) { + BSONArrayBuilder arrUpd(builder->subarrayStart("reload")); + auto findByIdWrapper = [opCtx, db, ns](BSONObj idDoc, BSONObj* fullDoc) { + return Helpers::findById(opCtx, db, ns, idDoc, *fullDoc); + }; + totalDocSize = xferMods(&arrUpd, &updateList, totalDocSize, findByIdWrapper); + arrUpd.done(); + } builder->append("size", totalDocSize); @@ -967,25 +968,27 @@ long long xferMods(BSONArrayBuilder* arr, std::list<BSONObj>* modsList, long long initialSize, std::function<bool(BSONObj, BSONObj*)> extractDocToAppendFn) { - const long long maxSize = 1024 * 1024; + const long long maxSize = BSONObjMaxUserSize; if (modsList->empty() || initialSize > maxSize) { return initialSize; } - long long totalSize = initialSize; - auto iter = modsList->begin(); - for (; iter != modsList->end() && totalSize < maxSize; ++iter) { + for (; iter != modsList->end(); ++iter) { auto idDoc = *iter; BSONObj fullDoc; if (extractDocToAppendFn(idDoc, &fullDoc)) { + if (arr->arrSize() && + (arr->len() + fullDoc.objsize() + kFixedCommandOverhead) > maxSize) { + break; + } arr->append(fullDoc); - totalSize += fullDoc.objsize(); } } + long long totalSize = arr->len(); modsList->erase(modsList->begin(), iter); return totalSize; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 85b96b98c3c..016ad590e58 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -56,6 +56,9 @@ class CollectionPtr; class Database; class RecordId; +// Overhead to prevent mods buffers from being too large +const long long kFixedCommandOverhead = 32 * 1024; + /** * Used to commit work for LogOpForSharding. Used to keep track of changes in documents that are * part of a chunk being migrated. diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 2bc379b624a..5cfb136c843 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -218,6 +218,14 @@ protected: return BSON("_id" << value << "X" << value); } + /** + * Instantiates a BSON object with objsize close to size. + */ + static BSONObj createSizedCollectionDocument(int id, long long size) { + std::string value(size, 'x'); + return BSON("_id" << id << "X" << id << "Y" << value); + } + protected: LogicalSessionId _lsid; TxnNumber _txnNumber{0}; @@ -359,6 +367,141 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { futureCommit.default_timed_get(); } +TEST_F(MigrationChunkClonerSourceLegacyTest, OneLargeDocumentTransferMods) { + const std::vector<BSONObj> contents = {createCollectionDocument(1)}; + + createShardedCollection(contents); + + MigrationChunkClonerSourceLegacy cloner( + createMoveChunkRequest(ChunkRange(BSON("X" << 1), BSON("X" << 100))), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); + + { + auto futureStartClone = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); + futureStartClone.default_timed_get(); + } + + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(1, arrBuilder.arrSize()); + } + } + + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + BSONObj insertDoc = + createSizedCollectionDocument(2, BSONObjMaxUserSize - kFixedCommandOverhead + 2 * 1024); + insertDocsInShardedCollection({insertDoc}); + WriteUnitOfWork wuow(operationContext()); + cloner.onInsertOp(operationContext(), insertDoc, {}); + wuow.commit(); + } + + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); + { + BSONObjBuilder modsBuilder; + ASSERT_OK(cloner.nextModsBatch(operationContext(), autoColl.getDb(), &modsBuilder)); + + const auto modsObj = modsBuilder.obj(); + ASSERT_EQ(1, modsObj["reload"].Array().size()); + } + } + + auto futureCommit = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_OK(cloner.commitClone(operationContext())); + futureCommit.default_timed_get(); +} + +TEST_F(MigrationChunkClonerSourceLegacyTest, ManySmallDocumentsTransferMods) { + const std::vector<BSONObj> contents = {createCollectionDocument(1)}; + + createShardedCollection(contents); + + MigrationChunkClonerSourceLegacy cloner( + createMoveChunkRequest(ChunkRange(BSON("X" << 1), BSON("X" << 1000000))), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); + + { + auto futureStartClone = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); + futureStartClone.default_timed_get(); + } + + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(1, arrBuilder.arrSize()); + } + } + + long long numDocuments = 0; + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + + std::vector<BSONObj> insertDocs; + long long totalSize = 0; + long long id = 2; + while (true) { + BSONObj add = createSizedCollectionDocument(id++, 4 * 1024); + // The overhead for a BSONObjBuilder with 4KB documents is ~ 22 * 1024, so this is the + // max documents to fit in one batch + if (totalSize + add.objsize() > BSONObjMaxUserSize - kFixedCommandOverhead - 22 * 1024) + break; + insertDocs.push_back(add); + totalSize += add.objsize(); + numDocuments++; + insertDocsInShardedCollection({add}); + } + + WriteUnitOfWork wuow(operationContext()); + for (BSONObj add : insertDocs) { + cloner.onInsertOp(operationContext(), add, {}); + } + wuow.commit(); + } + + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); + { + BSONObjBuilder modsBuilder; + ASSERT_OK(cloner.nextModsBatch(operationContext(), autoColl.getDb(), &modsBuilder)); + const auto modsObj = modsBuilder.obj(); + ASSERT_EQ(modsObj["reload"].Array().size(), numDocuments); + } + } + + auto futureCommit = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_OK(cloner.commitClone(operationContext())); + futureCommit.default_timed_get(); +} + TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) { MigrationChunkClonerSourceLegacy cloner( createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), |