summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2023-05-16 21:18:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-17 00:48:03 +0000
commitb34244749ab66e8aa761dbc715dad736e3186894 (patch)
tree685aed2f06e3094252da5ce7d66965dc2f5f0051
parentd646e44b7801a3e5b3230bbae7dcfe05a5ed8707 (diff)
downloadmongo-b34244749ab66e8aa761dbc715dad736e3186894.tar.gz
SERVER-76988 Abort the reshardCollection operation when the zone information is too large
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/sharding/resharding_update_tag_zones_large.js134
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp93
3 files changed, 198 insertions, 33 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 8d8cdad86af..4b1e0609069 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -385,6 +385,8 @@ last-continuous:
ticket: SERVER-77097
- test_file: jstests/sharding/move_primary_donor_cleaned_up_if_coordinator_steps_up_aborted.js
ticket: SERVER-76872
+ - test_file: jstests/sharding/resharding_update_tag_zones_large.js
+ ticket: SERVER-76988
suites: null
last-lts:
all:
@@ -854,4 +856,6 @@ last-lts:
ticket: SERVER-77097
- test_file: jstests/sharding/move_primary_donor_cleaned_up_if_coordinator_steps_up_aborted.js
ticket: SERVER-76872
+ - test_file: jstests/sharding/resharding_update_tag_zones_large.js
+ ticket: SERVER-76988
suites: null
diff --git a/jstests/sharding/resharding_update_tag_zones_large.js b/jstests/sharding/resharding_update_tag_zones_large.js
new file mode 100644
index 00000000000..38421fbe1fd
--- /dev/null
+++ b/jstests/sharding/resharding_update_tag_zones_large.js
@@ -0,0 +1,134 @@
+/**
+ * Testing that the reshardCollection command aborts correctly when the transaction for updating
+ * the persistent state (e.g. config.collections and config.tags) in the resharding commit phase
+ * fails with a TransactionTooLargeForCache error.
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+
+function assertEqualObj(lhs, rhs, keysToIgnore) {
+ assert.eq(Object.keys(lhs).length, Object.keys(lhs).length, {lhs, rhs});
+ for (let key in rhs) {
+ if (keysToIgnore && keysToIgnore.has(key)) {
+ continue;
+ }
+
+ const value = rhs[key];
+ if (typeof value === 'object') {
+ assertEqualObj(lhs[key], rhs[key], keysToIgnore);
+ } else {
+ assert.eq(lhs[key], rhs[key], {key, actual: lhs, expected: rhs});
+ }
+ }
+}
+
+const st = new ShardingTest({
+ shard: 2,
+ // This test uses a fail point to force the commitTransaction command in the resharding commit
+ // phase to fail with a TransactionTooLargeForCache error. To make the test setup work reliably,
+ // disable the cluster parameter refresher since it periodically runs internal transactions
+ // against the the config server.
+ mongosOptions: {setParameter: {'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}}
+});
+const configRSPrimary = st.configRS.getPrimary();
+
+const dbName = "testDb";
+const collName = "testColl";
+const ns = dbName + "." + collName;
+
+const configDB = st.s.getDB("config");
+const collectionsColl = configDB.getCollection("collections");
+const chunksColl = configDB.getCollection("chunks");
+const tagsColl = configDB.getCollection("tags");
+
+assert.commandWorked(st.s.adminCommand({enablesharding: dbName}));
+st.ensurePrimaryShard(dbName, st.shard0.shardName);
+assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {skey: "hashed"}}));
+
+const zoneName = "testZone";
+assert.commandWorked(st.s.adminCommand({addShardToZone: st.shard0.shardName, zone: zoneName}));
+
+const oldZone = {
+ tag: zoneName,
+ min: {skey: NumberLong("4470791281878691347")},
+ max: {skey: NumberLong("7766103514953448109")}
+};
+assert.commandWorked(st.s.adminCommand(
+ {updateZoneKeyRange: ns, min: oldZone.min, max: oldZone.max, zone: oldZone.tag}));
+
+const collBefore = collectionsColl.findOne({_id: ns});
+assert.neq(collBefore, null);
+const chunksBefore = chunksColl.find({uuid: collBefore.uuid}).sort({lastmod: -1}).toArray();
+assert.gte(chunksBefore.length, 1, chunksBefore);
+const tagsBefore = tagsColl.find({ns}).toArray();
+assert.gte(tagsBefore.length, 1, tagsBefore);
+
+const reshardingFunc = (mongosHost, ns, zoneName) => {
+ const mongos = new Mongo(mongosHost);
+ const newZone = {
+ tag: zoneName,
+ min: {skey: NumberLong("4470791281878691346")},
+ max: {skey: NumberLong("7766103514953448108")}
+ };
+ jsTest.log("Start resharding");
+ const reshardingRes = mongos.adminCommand({
+ reshardCollection: ns,
+ key: {skey: 1},
+ unique: false,
+ collation: {locale: 'simple'},
+ zones: [{zone: newZone.tag, min: newZone.min, max: newZone.max}],
+ numInitialChunks: 2,
+ });
+ jsTest.log("Finished resharding");
+ return reshardingRes;
+};
+let reshardingThread = new Thread(reshardingFunc, st.s.host, ns, zoneName);
+
+const persistFp =
+ configureFailPoint(configRSPrimary, "reshardingPauseCoordinatorBeforeDecisionPersisted");
+reshardingThread.start();
+persistFp.wait();
+
+const commitFp = configureFailPoint(configRSPrimary,
+ "failCommand",
+ {
+ failCommands: ["commitTransaction"],
+ failInternalCommands: true,
+ failLocalClients: true,
+ errorCode: ErrorCodes.TransactionTooLargeForCache,
+ },
+ {times: 1});
+persistFp.off();
+commitFp.wait();
+commitFp.off();
+const reshardingRes = reshardingThread.returnData();
+
+assert.commandFailedWithCode(reshardingRes, ErrorCodes.TransactionTooLargeForCache);
+
+const collAfter = collectionsColl.findOne({_id: ns});
+assert.neq(collAfter, null);
+const chunksAfter = chunksColl.find({uuid: collAfter.uuid}).sort({lastmod: -1}).toArray();
+const tagsAfter = tagsColl.find({ns}).toArray();
+
+jsTest.log(
+ "Verify that the collection metadata remains the same since the resharding operation failed.");
+
+assertEqualObj(collBefore, collAfter);
+
+assert.eq(chunksBefore.length, chunksAfter.length, {chunksBefore, chunksAfter});
+for (let i = 0; i < chunksAfter.length; i++) {
+ // Ignore "lastmod" when verifying the newest chunk because resharding bumps the minor version
+ // of the newest chunk whenever it goes through a state transition.
+ assertEqualObj(chunksBefore[i], chunksAfter[i], new Set(i == 0 ? ["lastmod"] : []));
+}
+
+assert.eq(tagsBefore.length, tagsAfter.length, {tagsBefore, tagsAfter});
+for (let i = 0; i < tagsAfter.length; i++) {
+ assertEqualObj(tagsBefore[i], tagsAfter[i]);
+}
+
+st.stop();
+})();
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 5f8801fef19..38a4672866b 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -33,6 +33,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/json.h"
#include "mongo/db/auth/authorization_session_impl.h"
+#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/primary_only_service.h"
@@ -1506,34 +1507,10 @@ ExecutorFuture<void> ReshardingCoordinator::_commitAndFinishReshardOperation(
return resharding::WithAutomaticRetry([this, executor, updatedCoordinatorDoc] {
return ExecutorFuture<void>(**executor)
.then(
- [this, executor, updatedCoordinatorDoc] { _commit(updatedCoordinatorDoc); })
- .then([this] { return _waitForMajority(_ctHolder->getStepdownToken()); })
- .thenRunOn(**executor)
- .then([this, executor] {
- _tellAllParticipantsToCommit(_coordinatorDoc.getSourceNss(), executor);
- })
- .then([this] { _updateChunkImbalanceMetrics(_coordinatorDoc.getSourceNss()); })
- .then([this, updatedCoordinatorDoc] {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- resharding::removeChunkDocs(opCtx.get(),
- updatedCoordinatorDoc.getSourceUUID());
- return Status::OK();
- })
- .then([this, executor] { return _awaitAllParticipantShardsDone(executor); })
- .then([this, executor] {
- _metrics->setEndFor(ReshardingMetrics::TimedPhase::kCriticalSection,
- getCurrentTime());
-
- // Best-effort attempt to trigger a refresh on the participant shards so
- // they see the collection metadata without reshardingFields and no longer
- // throw ReshardCollectionInProgress. There is no guarantee this logic ever
- // runs if the config server primary steps down after having removed the
- // coordinator state document.
- return _tellAllRecipientsToRefresh(executor);
- });
+ [this, executor, updatedCoordinatorDoc] { _commit(updatedCoordinatorDoc); });
})
.onTransientError([](const Status& status) {
- LOGV2(5093705,
+ LOGV2(7698801,
"Resharding coordinator encountered transient error while committing",
"error"_attr = status);
})
@@ -1541,18 +1518,68 @@ ExecutorFuture<void> ReshardingCoordinator::_commitAndFinishReshardOperation(
.until<Status>([](const Status& status) { return status.isOK(); })
.on(**executor, _ctHolder->getStepdownToken())
.onError([this, executor](Status status) {
- {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get());
+ if (status == ErrorCodes::TransactionTooLargeForCache) {
+ return _onAbortCoordinatorAndParticipants(executor, status);
}
+ return ExecutorFuture<void>(**executor, status);
+ })
+ .then([this, executor, updatedCoordinatorDoc] {
+ return resharding::WithAutomaticRetry([this, executor, updatedCoordinatorDoc] {
+ return ExecutorFuture<void>(**executor)
+ .then([this] { return _waitForMajority(_ctHolder->getStepdownToken()); })
+ .thenRunOn(**executor)
+ .then([this, executor] {
+ _tellAllParticipantsToCommit(_coordinatorDoc.getSourceNss(),
+ executor);
+ })
+ .then([this] {
+ _updateChunkImbalanceMetrics(_coordinatorDoc.getSourceNss());
+ })
+ .then([this, updatedCoordinatorDoc] {
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ resharding::removeChunkDocs(opCtx.get(),
+ updatedCoordinatorDoc.getSourceUUID());
+ return Status::OK();
+ })
+ .then([this, executor] {
+ return _awaitAllParticipantShardsDone(executor);
+ })
+ .then([this, executor] {
+ _metrics->setEndFor(ReshardingMetrics::TimedPhase::kCriticalSection,
+ getCurrentTime());
+
+ // Best-effort attempt to trigger a refresh on the participant shards
+ // so they see the collection metadata without reshardingFields and
+ // no longer throw ReshardCollectionInProgress. There is no guarantee
+ // this logic ever runs if the config server primary steps down after
+ // having removed the coordinator state document.
+ return _tellAllRecipientsToRefresh(executor);
+ });
+ })
+ .onTransientError([](const Status& status) {
+ LOGV2(5093705,
+ "Resharding coordinator encountered transient error while committing",
+ "error"_attr = status);
+ })
+ .onUnrecoverableError([](const Status& status) {})
+ .until<Status>([](const Status& status) { return status.isOK(); })
+ .on(**executor, _ctHolder->getStepdownToken())
+ .onError([this, executor](Status status) {
+ {
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(
+ opCtx.get());
+ }
- if (_ctHolder->isSteppingOrShuttingDown()) {
- return status;
- }
+ if (_ctHolder->isSteppingOrShuttingDown()) {
+ return status;
+ }
- LOGV2_FATAL(5277000,
+ LOGV2_FATAL(
+ 5277000,
"Unrecoverable error past the point resharding was guaranteed to succeed",
"error"_attr = redact(status));
+ });
});
}