From b34244749ab66e8aa761dbc715dad736e3186894 Mon Sep 17 00:00:00 2001 From: Cheahuychou Mao Date: Tue, 16 May 2023 21:18:23 +0000 Subject: SERVER-76988 Abort the reshardCollection operation when the zone information is too large --- etc/backports_required_for_multiversion_tests.yml | 4 + .../sharding/resharding_update_tag_zones_large.js | 134 +++++++++++++++++++++ .../resharding/resharding_coordinator_service.cpp | 93 +++++++++----- 3 files changed, 198 insertions(+), 33 deletions(-) create mode 100644 jstests/sharding/resharding_update_tag_zones_large.js diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 8d8cdad86af..4b1e0609069 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -385,6 +385,8 @@ last-continuous: ticket: SERVER-77097 - test_file: jstests/sharding/move_primary_donor_cleaned_up_if_coordinator_steps_up_aborted.js ticket: SERVER-76872 + - test_file: jstests/sharding/resharding_update_tag_zones_large.js + ticket: SERVER-76988 suites: null last-lts: all: @@ -854,4 +856,6 @@ last-lts: ticket: SERVER-77097 - test_file: jstests/sharding/move_primary_donor_cleaned_up_if_coordinator_steps_up_aborted.js ticket: SERVER-76872 + - test_file: jstests/sharding/resharding_update_tag_zones_large.js + ticket: SERVER-76988 suites: null diff --git a/jstests/sharding/resharding_update_tag_zones_large.js b/jstests/sharding/resharding_update_tag_zones_large.js new file mode 100644 index 00000000000..38421fbe1fd --- /dev/null +++ b/jstests/sharding/resharding_update_tag_zones_large.js @@ -0,0 +1,134 @@ +/** + * Testing that the reshardCollection command aborts correctly when the transaction for updating + * the persistent state (e.g. config.collections and config.tags) in the resharding commit phase + * fails with a TransactionTooLargeForCache error. + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); + +function assertEqualObj(lhs, rhs, keysToIgnore) { + assert.eq(Object.keys(lhs).length, Object.keys(lhs).length, {lhs, rhs}); + for (let key in rhs) { + if (keysToIgnore && keysToIgnore.has(key)) { + continue; + } + + const value = rhs[key]; + if (typeof value === 'object') { + assertEqualObj(lhs[key], rhs[key], keysToIgnore); + } else { + assert.eq(lhs[key], rhs[key], {key, actual: lhs, expected: rhs}); + } + } +} + +const st = new ShardingTest({ + shard: 2, + // This test uses a fail point to force the commitTransaction command in the resharding commit + // phase to fail with a TransactionTooLargeForCache error. To make the test setup work reliably, + // disable the cluster parameter refresher since it periodically runs internal transactions + // against the the config server. + mongosOptions: {setParameter: {'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}} +}); +const configRSPrimary = st.configRS.getPrimary(); + +const dbName = "testDb"; +const collName = "testColl"; +const ns = dbName + "." + collName; + +const configDB = st.s.getDB("config"); +const collectionsColl = configDB.getCollection("collections"); +const chunksColl = configDB.getCollection("chunks"); +const tagsColl = configDB.getCollection("tags"); + +assert.commandWorked(st.s.adminCommand({enablesharding: dbName})); +st.ensurePrimaryShard(dbName, st.shard0.shardName); +assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {skey: "hashed"}})); + +const zoneName = "testZone"; +assert.commandWorked(st.s.adminCommand({addShardToZone: st.shard0.shardName, zone: zoneName})); + +const oldZone = { + tag: zoneName, + min: {skey: NumberLong("4470791281878691347")}, + max: {skey: NumberLong("7766103514953448109")} +}; +assert.commandWorked(st.s.adminCommand( + {updateZoneKeyRange: ns, min: oldZone.min, max: oldZone.max, zone: oldZone.tag})); + +const collBefore = collectionsColl.findOne({_id: ns}); +assert.neq(collBefore, null); +const chunksBefore = chunksColl.find({uuid: collBefore.uuid}).sort({lastmod: -1}).toArray(); +assert.gte(chunksBefore.length, 1, chunksBefore); +const tagsBefore = tagsColl.find({ns}).toArray(); +assert.gte(tagsBefore.length, 1, tagsBefore); + +const reshardingFunc = (mongosHost, ns, zoneName) => { + const mongos = new Mongo(mongosHost); + const newZone = { + tag: zoneName, + min: {skey: NumberLong("4470791281878691346")}, + max: {skey: NumberLong("7766103514953448108")} + }; + jsTest.log("Start resharding"); + const reshardingRes = mongos.adminCommand({ + reshardCollection: ns, + key: {skey: 1}, + unique: false, + collation: {locale: 'simple'}, + zones: [{zone: newZone.tag, min: newZone.min, max: newZone.max}], + numInitialChunks: 2, + }); + jsTest.log("Finished resharding"); + return reshardingRes; +}; +let reshardingThread = new Thread(reshardingFunc, st.s.host, ns, zoneName); + +const persistFp = + configureFailPoint(configRSPrimary, "reshardingPauseCoordinatorBeforeDecisionPersisted"); +reshardingThread.start(); +persistFp.wait(); + +const commitFp = configureFailPoint(configRSPrimary, + "failCommand", + { + failCommands: ["commitTransaction"], + failInternalCommands: true, + failLocalClients: true, + errorCode: ErrorCodes.TransactionTooLargeForCache, + }, + {times: 1}); +persistFp.off(); +commitFp.wait(); +commitFp.off(); +const reshardingRes = reshardingThread.returnData(); + +assert.commandFailedWithCode(reshardingRes, ErrorCodes.TransactionTooLargeForCache); + +const collAfter = collectionsColl.findOne({_id: ns}); +assert.neq(collAfter, null); +const chunksAfter = chunksColl.find({uuid: collAfter.uuid}).sort({lastmod: -1}).toArray(); +const tagsAfter = tagsColl.find({ns}).toArray(); + +jsTest.log( + "Verify that the collection metadata remains the same since the resharding operation failed."); + +assertEqualObj(collBefore, collAfter); + +assert.eq(chunksBefore.length, chunksAfter.length, {chunksBefore, chunksAfter}); +for (let i = 0; i < chunksAfter.length; i++) { + // Ignore "lastmod" when verifying the newest chunk because resharding bumps the minor version + // of the newest chunk whenever it goes through a state transition. + assertEqualObj(chunksBefore[i], chunksAfter[i], new Set(i == 0 ? ["lastmod"] : [])); +} + +assert.eq(tagsBefore.length, tagsAfter.length, {tagsBefore, tagsAfter}); +for (let i = 0; i < tagsAfter.length; i++) { + assertEqualObj(tagsBefore[i], tagsAfter[i]); +} + +st.stop(); +})(); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 5f8801fef19..38a4672866b 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/json.h" #include "mongo/db/auth/authorization_session_impl.h" +#include "mongo/db/concurrency/exception_util.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/primary_only_service.h" @@ -1506,34 +1507,10 @@ ExecutorFuture ReshardingCoordinator::_commitAndFinishReshardOperation( return resharding::WithAutomaticRetry([this, executor, updatedCoordinatorDoc] { return ExecutorFuture(**executor) .then( - [this, executor, updatedCoordinatorDoc] { _commit(updatedCoordinatorDoc); }) - .then([this] { return _waitForMajority(_ctHolder->getStepdownToken()); }) - .thenRunOn(**executor) - .then([this, executor] { - _tellAllParticipantsToCommit(_coordinatorDoc.getSourceNss(), executor); - }) - .then([this] { _updateChunkImbalanceMetrics(_coordinatorDoc.getSourceNss()); }) - .then([this, updatedCoordinatorDoc] { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - resharding::removeChunkDocs(opCtx.get(), - updatedCoordinatorDoc.getSourceUUID()); - return Status::OK(); - }) - .then([this, executor] { return _awaitAllParticipantShardsDone(executor); }) - .then([this, executor] { - _metrics->setEndFor(ReshardingMetrics::TimedPhase::kCriticalSection, - getCurrentTime()); - - // Best-effort attempt to trigger a refresh on the participant shards so - // they see the collection metadata without reshardingFields and no longer - // throw ReshardCollectionInProgress. There is no guarantee this logic ever - // runs if the config server primary steps down after having removed the - // coordinator state document. - return _tellAllRecipientsToRefresh(executor); - }); + [this, executor, updatedCoordinatorDoc] { _commit(updatedCoordinatorDoc); }); }) .onTransientError([](const Status& status) { - LOGV2(5093705, + LOGV2(7698801, "Resharding coordinator encountered transient error while committing", "error"_attr = status); }) @@ -1541,18 +1518,68 @@ ExecutorFuture ReshardingCoordinator::_commitAndFinishReshardOperation( .until([](const Status& status) { return status.isOK(); }) .on(**executor, _ctHolder->getStepdownToken()) .onError([this, executor](Status status) { - { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get()); + if (status == ErrorCodes::TransactionTooLargeForCache) { + return _onAbortCoordinatorAndParticipants(executor, status); } + return ExecutorFuture(**executor, status); + }) + .then([this, executor, updatedCoordinatorDoc] { + return resharding::WithAutomaticRetry([this, executor, updatedCoordinatorDoc] { + return ExecutorFuture(**executor) + .then([this] { return _waitForMajority(_ctHolder->getStepdownToken()); }) + .thenRunOn(**executor) + .then([this, executor] { + _tellAllParticipantsToCommit(_coordinatorDoc.getSourceNss(), + executor); + }) + .then([this] { + _updateChunkImbalanceMetrics(_coordinatorDoc.getSourceNss()); + }) + .then([this, updatedCoordinatorDoc] { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + resharding::removeChunkDocs(opCtx.get(), + updatedCoordinatorDoc.getSourceUUID()); + return Status::OK(); + }) + .then([this, executor] { + return _awaitAllParticipantShardsDone(executor); + }) + .then([this, executor] { + _metrics->setEndFor(ReshardingMetrics::TimedPhase::kCriticalSection, + getCurrentTime()); + + // Best-effort attempt to trigger a refresh on the participant shards + // so they see the collection metadata without reshardingFields and + // no longer throw ReshardCollectionInProgress. There is no guarantee + // this logic ever runs if the config server primary steps down after + // having removed the coordinator state document. + return _tellAllRecipientsToRefresh(executor); + }); + }) + .onTransientError([](const Status& status) { + LOGV2(5093705, + "Resharding coordinator encountered transient error while committing", + "error"_attr = status); + }) + .onUnrecoverableError([](const Status& status) {}) + .until([](const Status& status) { return status.isOK(); }) + .on(**executor, _ctHolder->getStepdownToken()) + .onError([this, executor](Status status) { + { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet( + opCtx.get()); + } - if (_ctHolder->isSteppingOrShuttingDown()) { - return status; - } + if (_ctHolder->isSteppingOrShuttingDown()) { + return status; + } - LOGV2_FATAL(5277000, + LOGV2_FATAL( + 5277000, "Unrecoverable error past the point resharding was guaranteed to succeed", "error"_attr = redact(status)); + }); }); } -- cgit v1.2.1