diff options
-rw-r--r-- | jstests/sharding/remove_shard_near_doc_size_limit.js | 92 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 8 |
3 files changed, 108 insertions, 27 deletions
diff --git a/jstests/sharding/remove_shard_near_doc_size_limit.js b/jstests/sharding/remove_shard_near_doc_size_limit.js new file mode 100644 index 00000000000..c918f7000db --- /dev/null +++ b/jstests/sharding/remove_shard_near_doc_size_limit.js @@ -0,0 +1,92 @@ +/* + * This test reproduces the error reported in HELP-22995. It creates a jumbo chunk with documents + * that are close to the 16MB document size limit to force the batching code in move chunk to + * consider adding them together in a batch. It ensures that the proper document size is considered + * and that we can still migrate when calling removeShard. + * + * @tags: [requires_fcv_44, multiversion_incompatible] + */ + +(function() { +'use strict'; +const dbName = "test"; +const collName = "user"; +const ns = dbName + "." + collName; +const shardKeys = [-1, 1]; + +// This number is chosen so that the chunks are considered 'large' as defined by +// the MigrationChunkClonerSourceLegacy class. Currently, that class considers chunks containing +// more than the following number of documents as 'large': +// (13/10) * MaxChunkSize / avgRecSize (MaxChunkSize is 64MB by default) +const numDocs = 10; + +// Size is slightly under the 16MB document size limit. This ensures that any two documents must be +// be in separate batches when cloning. +const bigDocSize = 16 * 1024 * 1024 - 4096; +const bigDocPayload = "x".repeat(bigDocSize); + +let st = new ShardingTest({shards: 2}); +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +st.ensurePrimaryShard(dbName, st.shard0.shardName); + +jsTest.log("Sharding collection with one chunk on each shard."); +assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}})); +assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 0}})); +assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {x: 1}, to: st.shard1.shardName})); + +function removeShardAndWait(shardName) { + const removeShardCmd = {removeShard: shardName}; + const res = st.s.adminCommand(removeShardCmd); + + assert.commandWorked(res); + assert(res.state === "started"); + + assert.soon(function() { + let res = st.s.adminCommand(removeShardCmd); + if (res.state === "completed") { + return true; + } else { + jsTest.log("Still waiting for shard removal to complete:"); + printjson(res); + assert.commandWorked(st.s.adminCommand({clearJumboFlag: ns, find: {"x": 1}})); + return false; + } + }); + + jsTest.log("Shard removal complete."); +} + +function assertDocsExist(shardKeys, numDocs, payloadSize) { + shardKeys.forEach(key => { + for (let i = 0; i < numDocs; i++) { + let db = st.rs0.getPrimary().getDB(dbName); + let query = {x: key, seq: i}; + let doc = db.getCollection(collName).findOne(query); + assert(doc); + let payload = doc.data; + assert.eq(payload.length, + payloadSize, + tojson(query) + " does not have the expected payload length of " + + payloadSize + " bytes"); + } + }); +} + +jsTest.log("Insert " + numDocs + " documents with " + bigDocSize + " bytes each."); +shardKeys.forEach(key => { + for (let i = 0; i < numDocs; i++) { + let doc = {x: key, seq: i, data: bigDocPayload}; + assert.commandWorked(st.s.getCollection(ns).insert(doc)); + } +}); + +// Start balancer to migrate chunks from the removed shard. +assert.commandWorked(st.s.getDB("config").settings.update( + {_id: "balancer"}, {$set: {attemptToBalanceJumboChunks: true}}, true)); +st.startBalancer(); + +removeShardAndWait(st.shard1.shardName); +assertDocsExist(shardKeys, numDocs, bigDocSize); + +st.stop(); +})(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index ec4588624f6..28377ec41a0 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -677,7 +677,8 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon Milliseconds(internalQueryExecYieldPeriodMS.load())); if (!_jumboChunkCloneState->clonerExec) { - auto exec = uassertStatusOK(_getIndexScanExecutor(opCtx, collection)); + auto exec = uassertStatusOK(_getIndexScanExecutor( + opCtx, collection, InternalPlanner::IndexScanOptions::IXSCAN_FETCH)); _jumboChunkCloneState->clonerExec = std::move(exec); } else { _jumboChunkCloneState->clonerExec->reattachToOperationContext(opCtx); @@ -685,39 +686,24 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon } BSONObj obj; - RecordId recordId; PlanExecutor::ExecState execState; - while (PlanExecutor::ADVANCED == - (execState = _jumboChunkCloneState->clonerExec->getNext( - &obj, _jumboChunkCloneState->stashedRecordId ? nullptr : &recordId))) { + (execState = _jumboChunkCloneState->clonerExec->getNext(&obj, nullptr))) { stdx::unique_lock<Latch> lk(_mutex); _jumboChunkCloneState->clonerState = execState; lk.unlock(); opCtx->checkForInterrupt(); - // Use the builder size instead of accumulating the document sizes directly so // that we take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && (arrBuilder->len() + obj.objsize() + 1024) > BSONObjMaxUserSize) { _jumboChunkCloneState->clonerExec->enqueue(obj); - - // Stash the recordId we just read to add to the next batch. - if (!recordId.isNull()) { - invariant(!_jumboChunkCloneState->stashedRecordId); - _jumboChunkCloneState->stashedRecordId = std::move(recordId); - } - break; } - Snapshotted<BSONObj> doc; - invariant(collection->findDoc( - opCtx, _jumboChunkCloneState->stashedRecordId.value_or(recordId), &doc)); - arrBuilder->append(doc.value()); - _jumboChunkCloneState->stashedRecordId = boost::none; + arrBuilder->append(obj); lk.lock(); _jumboChunkCloneState->docsCloned++; @@ -885,8 +871,10 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO } StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> -MigrationChunkClonerSourceLegacy::_getIndexScanExecutor(OperationContext* opCtx, - Collection* const collection) { +MigrationChunkClonerSourceLegacy::_getIndexScanExecutor( + OperationContext* opCtx, + Collection* const collection, + InternalPlanner::IndexScanOptions scanOption) { // Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any // multi-key index prefixed by shard key cannot be multikey over the shard key fields. const IndexDescriptor* idx = @@ -913,7 +901,9 @@ MigrationChunkClonerSourceLegacy::_getIndexScanExecutor(OperationContext* opCtx, min, max, BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_AUTO); + PlanExecutor::YIELD_AUTO, + InternalPlanner::Direction::FORWARD, + scanOption); } Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) { @@ -925,7 +915,8 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC str::stream() << "Collection " << _args.getNss().ns() << " does not exist."}; } - auto swExec = _getIndexScanExecutor(opCtx, collection); + auto swExec = + _getIndexScanExecutor(opCtx, collection, InternalPlanner::IndexScanOptions::IXSCAN_DEFAULT); if (!swExec.isOK()) { return swExec.getStatus(); } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 7990615e38f..9d438d20eb0 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -224,7 +224,9 @@ private: StatusWith<BSONObj> _callRecipient(const BSONObj& cmdObj); StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getIndexScanExecutor( - OperationContext* opCtx, Collection* const collection); + OperationContext* opCtx, + Collection* const collection, + InternalPlanner::IndexScanOptions scanOption); void _nextCloneBatchFromIndexScan(OperationContext* opCtx, Collection* collection, @@ -387,10 +389,6 @@ private: // The current state of 'clonerExec'. PlanExecutor::ExecState clonerState; - // RecordId of the last doc read in by 'clonerExec' if collection scan yields during - // cloning. - boost::optional<RecordId> stashedRecordId; - // Number docs in jumbo chunk cloned so far int docsCloned = 0; }; |