summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2021-06-07 21:46:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-10 17:30:55 +0000
commitdf3f17d4ae56f4d22304498405fb072d9627f14a (patch)
treeeadf85418d2dc3a87cc58085621c0c0adab3a395
parent2267d1dfd1ed6e0cdde57461c157f36ee3a6edb5 (diff)
downloadmongo-df3f17d4ae56f4d22304498405fb072d9627f14a.tar.gz
SERVER-57182 reshardCollection command is shuffling data even when requested shard key already matches
-rw-r--r--jstests/sharding/reshard_collection_joins_existing_operation.js115
-rw-r--r--jstests/sharding/resharding_on_existing_key_is_noop.js39
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp55
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp15
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h15
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp5
6 files changed, 234 insertions, 10 deletions
diff --git a/jstests/sharding/reshard_collection_joins_existing_operation.js b/jstests/sharding/reshard_collection_joins_existing_operation.js
new file mode 100644
index 00000000000..0ee7dce2b64
--- /dev/null
+++ b/jstests/sharding/reshard_collection_joins_existing_operation.js
@@ -0,0 +1,115 @@
+/**
+ * Tests that if a _configsvrReshardCollection command is issued and then the same command gets
+ * issued again, the second command issued joins with the instance of resharding already in
+ * progress.
+ *
+ * Use _configsvrReshardCollection instead of reshardCollection to exercise the behavior of the
+ * config server in the absence of the distributed lock taken by _shardsvrReshardCollection on the
+ * primary shard for the database.
+ *
+ * @tags: [
+ * requires_fcv_49,
+ * uses_atclustertime,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/discover_topology.js");
+load("jstests/libs/parallelTester.js");
+load("jstests/sharding/libs/resharding_test_fixture.js");
+
+// Generates a new thread to run _configsvrReshardCollection.
+const makeConfigsvrReshardCollectionThread = (mongosConnString, configsvrReshardCollectionCmd) => {
+ return new Thread((mongosConnString, configsvrReshardCollectionCmd) => {
+ load("jstests/libs/discover_topology.js");
+ const mongos = new Mongo(mongosConnString);
+ const topology = DiscoverTopology.findConnectedNodes(mongos);
+ const configsvr = new Mongo(topology.configsvr.nodes[0]);
+ assert.commandWorked(configsvr.adminCommand(configsvrReshardCollectionCmd));
+ }, mongosConnString, configsvrReshardCollectionCmd);
+};
+
+const constructTempReshardingNs = (dbName, collName) => {
+ const sourceCollectionUUID = getUUIDFromListCollections(dbName, collName);
+ const sourceCollectionUUIDString = extractUUIDFromObject(sourceCollectionUUID);
+ return `${dbName}.system.resharding.${sourceCollectionUUIDString}`;
+};
+
+// Callers must ensure the temporary collection has actually been created by the time this is
+// called.
+const getTempUUID = (tempNs) => {
+ const tempCollection = mongos.getCollection(tempNs);
+ return getUUIDFromConfigCollections(mongos, tempCollection.getFullName());
+};
+
+const st = new ShardingTest({
+ mongos: 1,
+ config: 1,
+ shards: 2,
+ rs: {nodes: 2},
+});
+
+const sourceCollection = st.s.getCollection("reshardingDb.coll");
+
+CreateShardedCollectionUtil.shardCollectionWithChunks(sourceCollection, {oldKey: 1}, [
+ {min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: st.shard0.shardName},
+]);
+
+const reshardKey = {
+ newKey: 1
+};
+const configsvrReshardCollectionCmd = {
+ _configsvrReshardCollection: sourceCollection.getFullName(),
+ key: reshardKey,
+ writeConcern: {w: "majority"}
+};
+
+// Before starting the actual resharding, get the source collection's UUID to construct the
+// namespace for the temporary collection that will be created.
+const sourceDBName = sourceCollection.getDB();
+const sourceCollName = sourceCollection.getName();
+const tempNs = constructTempReshardingNs(sourceDBName, sourceCollName);
+
+const mongos = sourceCollection.getMongo();
+const topology = DiscoverTopology.findConnectedNodes(mongos);
+const configsvr = new Mongo(topology.configsvr.nodes[0]);
+
+const reshardCollectionThread1 =
+ makeConfigsvrReshardCollectionThread(topology.mongos.nodes[0], configsvrReshardCollectionCmd);
+const pauseBeforeCloningFP =
+ configureFailPoint(configsvr, "reshardingPauseCoordinatorBeforeCloning");
+
+// Issue the first _configsvrReshardCollection command and pause after the temporary collection is
+// created but before its config.collections entry is replaced.
+reshardCollectionThread1.start();
+pauseBeforeCloningFP.wait();
+
+// The UUID of the temporary resharding collection should become the UUID of the original collection
+// once resharding has completed.
+const expectedUUIDAfterReshardingCompletes = getTempUUID(tempNs);
+
+const reshardCollectionJoinedFP =
+ configureFailPoint(configsvr, "reshardCollectionJoinedExistingOperation");
+const reshardCollectionThread2 =
+ makeConfigsvrReshardCollectionThread(topology.mongos.nodes[0], configsvrReshardCollectionCmd);
+
+// Hitting the reshardCollectionJoinedFP is additional confirmation that the second
+// _configsvrReshardCollection command (identical to the first) gets joined with the instance
+// created/running for the first command issued.
+reshardCollectionThread2.start();
+reshardCollectionJoinedFP.wait();
+
+reshardCollectionJoinedFP.off();
+pauseBeforeCloningFP.off();
+
+reshardCollectionThread2.join();
+reshardCollectionThread1.join();
+
+// Confirm the UUID for the namespace that was resharded is the same as the temporary collection's
+// UUID before the second reshardCollection command was issued.
+const finalSourceCollectionUUID = getUUIDFromListCollections(sourceDBName, sourceCollName);
+assert.eq(expectedUUIDAfterReshardingCompletes, finalSourceCollectionUUID);
+
+st.stop();
+})();
diff --git a/jstests/sharding/resharding_on_existing_key_is_noop.js b/jstests/sharding/resharding_on_existing_key_is_noop.js
new file mode 100644
index 00000000000..dcc3f68178e
--- /dev/null
+++ b/jstests/sharding/resharding_on_existing_key_is_noop.js
@@ -0,0 +1,39 @@
+/**
+ * Tests that trying to perform reshardCollection with a resharding key that matches the
+ * collection's existing shard key is a noop (which can be done by confirming the collection's UUID
+ * remains unchanged after the operation).
+ *
+ * @tags: [uses_atclustertime, requires_fcv_49,]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/uuid_util.js");
+load("jstests/sharding/libs/create_sharded_collection_util.js");
+
+const st = new ShardingTest({
+ mongos: 1,
+ config: 1,
+ shards: 2,
+ rs: {nodes: 2},
+});
+
+const sourceCollection = st.s.getCollection("reshardingDb.coll");
+
+CreateShardedCollectionUtil.shardCollectionWithChunks(sourceCollection, {key: 1}, [
+ {min: {key: MinKey}, max: {key: 0}, shard: st.shard0.shardName},
+ {min: {key: 0}, max: {key: MaxKey}, shard: st.shard1.shardName},
+]);
+
+const ns = sourceCollection.getFullName();
+const mongos = sourceCollection.getMongo();
+const sourceDB = sourceCollection.getDB();
+
+// The UUID should remain the same if the resharding key matches the existing shard key.
+const preReshardCollectionUUID = getUUIDFromListCollections(sourceDB, sourceCollection.getName());
+assert.commandWorked(mongos.adminCommand({reshardCollection: ns, key: {key: 1}}));
+const postReshardCollectionUUID = getUUIDFromListCollections(sourceDB, sourceCollection.getName());
+assert.eq(preReshardCollectionUUID, postReshardCollectionUUID);
+
+st.stop();
+})();
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index 5586c71a25d..9830b44c5db 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -48,8 +48,35 @@
#include "mongo/s/resharding/resharding_feature_flag_gen.h"
namespace mongo {
+
+MONGO_FAIL_POINT_DEFINE(reshardCollectionJoinedExistingOperation);
+
namespace {
+// Returns a resharding instance if one exists for the same collection and shard key already.
+boost::optional<std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordinator>>
+getExistingInstanceToJoin(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& newShardKey) {
+ auto reshardingCoordinatorService = checked_cast<ReshardingCoordinatorService*>(
+ repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
+ ->lookupServiceByName(ReshardingCoordinatorService::kServiceName));
+ auto instances = reshardingCoordinatorService->getAllReshardingInstances(opCtx);
+ for (auto& instance : instances) {
+ auto reshardingCoordinator =
+ checked_pointer_cast<ReshardingCoordinatorService::ReshardingCoordinator>(instance);
+
+ auto instanceMetadata = reshardingCoordinator->getMetadata();
+ if (SimpleBSONObjComparator::kInstance.evaluate(
+ instanceMetadata.getReshardingKey().toBSON() == newShardKey) &&
+ instanceMetadata.getSourceNss() == nss) {
+ return reshardingCoordinator;
+ };
+ }
+
+ return boost::none;
+}
+
class ConfigsvrReshardCollectionCommand final
: public TypedCommand<ConfigsvrReshardCollectionCommand> {
public:
@@ -109,7 +136,9 @@ public:
*presetChunks, opCtx, ShardKeyPattern(request().getKey()).getKeyPattern());
}
- auto instance = ([&] {
+ // Returns boost::none if there isn't any work to be done by the resharding operation.
+ auto instance = ([&]() -> boost::optional<std::shared_ptr<
+ ReshardingCoordinatorService::ReshardingCoordinator>> {
FixedFCVRegion fixedFcv(opCtx);
uassert(ErrorCodes::CommandNotSupported,
@@ -117,10 +146,28 @@ public:
resharding::gFeatureFlagResharding.isEnabled(
serverGlobalParams.featureCompatibility));
+ if (auto existingInstance =
+ getExistingInstanceToJoin(opCtx, nss, request().getKey())) {
+ // Join the existing resharding operation to prevent generating a new resharding
+ // instance if the same command is issued consecutively due to client disconnect
+ // etc.
+ reshardCollectionJoinedExistingOperation.pauseWhileSet(opCtx);
+ existingInstance.get()->getCoordinatorDocWrittenFuture().get(opCtx);
+ return existingInstance;
+ }
+
const auto cm = uassertStatusOK(
Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(
opCtx, nss));
+ const auto currentShardKey = cm.getShardKeyPattern().getKeyPattern();
+ if (SimpleBSONObjComparator::kInstance.evaluate(currentShardKey.toBSON() ==
+ request().getKey())) {
+ // There isn't any work to be done by the resharding operation since the
+ // existing shard key matches the desired new shard key.
+ return boost::none;
+ }
+
auto tempReshardingNss = constructTemporaryReshardingNss(
nss.db(), getCollectionUUIDFromChunkManger(nss, cm));
@@ -158,7 +205,11 @@ public:
return instance;
})();
- instance->getCompletionFuture().get(opCtx);
+ if (instance) {
+ // There is work to be done in order to have the collection's shard key match the
+ // requested shard key. Wait until the work is complete.
+ instance.get()->getCompletionFuture().get(opCtx);
+ }
}
private:
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index c4f8a745c8d..18e8cc3b8aa 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -926,7 +926,10 @@ ThreadPool::Limits ReshardingCoordinatorService::getThreadPoolLimits() const {
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingCoordinatorService::constructInstance(
BSONObj initialState) {
return std::make_shared<ReshardingCoordinator>(
- this, std::move(initialState), std::make_shared<ReshardingCoordinatorExternalStateImpl>());
+ this,
+ ReshardingCoordinatorDocument::parse(IDLParserErrorContext("ReshardingCoordinatorStateDoc"),
+ std::move(initialState)),
+ std::make_shared<ReshardingCoordinatorExternalStateImpl>());
}
ExecutorFuture<void> ReshardingCoordinatorService::_rebuildService(
@@ -971,13 +974,13 @@ void ReshardingCoordinatorService::abortAllReshardCollection(OperationContext* o
ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(
const ReshardingCoordinatorService* coordinatorService,
- const BSONObj& state,
+ const ReshardingCoordinatorDocument& coordinatorDoc,
std::shared_ptr<ReshardingCoordinatorExternalState> externalState)
: PrimaryOnlyService::TypedInstance<ReshardingCoordinator>(),
- _id(state["_id"].wrap().getOwned()),
+ _id(coordinatorDoc.getReshardingUUID().toBSON()),
_coordinatorService(coordinatorService),
- _coordinatorDoc(ReshardingCoordinatorDocument::parse(
- IDLParserErrorContext("ReshardingCoordinatorStateDoc"), state)),
+ _metadata(coordinatorDoc.getCommonReshardingMetadata()),
+ _coordinatorDoc(coordinatorDoc),
_markKilledExecutor(std::make_shared<ThreadPool>([] {
ThreadPool::Options options;
options.poolName = "ReshardingCoordinatorCancelableOpCtxPool";
@@ -1122,7 +1125,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDeci
}
if (_coordinatorDoc.getState() < CoordinatorStateEnum::kPreparingToDonate) {
- // Participants were never made aware of the resharding opeartion. Abort without
+ // Participants were never made aware of the resharding operation. Abort without
// waiting for participant acknowledgement.
_onAbortCoordinatorOnly(executor, status);
} else {
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index f6fda9129c7..454e6fd5c49 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -188,6 +188,11 @@ public:
std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override;
+ std::vector<std::shared_ptr<PrimaryOnlyService::Instance>> getAllReshardingInstances(
+ OperationContext* opCtx) {
+ return getAllInstances(opCtx);
+ }
+
/**
* Tries to abort all active reshardCollection operations. Note that this doesn't differentiate
* between operations interrupted due to stepdown or abort. Callers who wish to confirm that
@@ -206,7 +211,7 @@ class ReshardingCoordinatorService::ReshardingCoordinator final
public:
explicit ReshardingCoordinator(
const ReshardingCoordinatorService* coordinatorService,
- const BSONObj& state,
+ const ReshardingCoordinatorDocument& coordinatorDoc,
std::shared_ptr<ReshardingCoordinatorExternalState> externalState);
~ReshardingCoordinator() = default;
@@ -226,6 +231,10 @@ public:
void installCoordinatorDoc(OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc) noexcept;
+ CommonReshardingMetadata getMetadata() const {
+ return _metadata;
+ }
+
/**
* Returns a Future that will be resolved when all work associated with this Instance has
* completed running.
@@ -423,6 +432,10 @@ private:
// The primary-only service instance corresponding to the coordinator instance. Not owned.
const ReshardingCoordinatorService* const _coordinatorService;
+ // The in-memory representation of the immutable portion of the document in
+ // config.reshardingOperations.
+ const CommonReshardingMetadata _metadata;
+
// Observes writes that indicate state changes for this resharding operation and notifies
// 'this' when all donors/recipients have entered some state so that 'this' can transition
// states.
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
index dfc98b2ed11..f816e805262 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
@@ -201,7 +201,10 @@ public:
std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override {
return std::make_shared<ReshardingCoordinator>(
- this, std::move(initialState), std::make_shared<ExternalStateForTest>());
+ this,
+ ReshardingCoordinatorDocument::parse(
+ IDLParserErrorContext("ReshardingCoordinatorStateDoc"), std::move(initialState)),
+ std::make_shared<ExternalStateForTest>());
}
};