summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2021-03-16 14:32:42 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-18 10:17:20 +0000
commit6c3c2cd287996ccc1821b7f04595edc7a4be3104 (patch)
tree3c05eca8c799bed7dec27d99e0965614ddbe7846
parent1299785257ac51faa99cc8197b4b89ed4502489b (diff)
downloadmongo-6c3c2cd287996ccc1821b7f04595edc7a4be3104.tar.gz
SERVER-55146: Bump collection version on any modification of config.collections reshardingFields or allowMigrations fields
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h15
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp36
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp70
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp245
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_test.cpp45
-rw-r--r--src/mongo/s/chunk_manager.h2
6 files changed, 200 insertions, 213 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index f2b0d94b92c..10afc542b35 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -283,14 +283,19 @@ public:
* In a single transaction, effectively bumps the shard version for each shard in the collection
* to be the current collection version's major version + 1 inside an already-running
* transaction.
- *
- * Note: it's the responsibility of the caller to ensure that the list of shards is stable,
- * as any shards added after the shard ids have been passed in will be missed.
*/
- void bumpCollShardVersionsAndChangeMetadataInTxn(
+ void bumpCollectionVersionAndChangeMetadataInTxn(
OperationContext* opCtx,
const NamespaceString& nss,
- const std::vector<ShardId>& shardIds,
+ unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc);
+
+ /**
+ * Same as bumpCollectionVersionAndChangeMetadataInTxn, but bumps the version for several
+ * collections in a single transaction.
+ */
+ void bumpMultipleCollectionVersionsAndChangeMetadataInTxn(
+ OperationContext* opCtx,
+ const std::vector<NamespaceString>& collNames,
unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc);
/**
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp
index 9c36446fd15..9ff72e16ffe 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp
@@ -161,9 +161,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
auto opCtx = operationContext();
- std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {});
+ ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn(
+ opCtx, kNss, [&](OperationContext*, TxnNumber) {});
ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
shard0Chunk0, getChunkDoc(operationContext(), shard0Chunk0.getMin()), targetChunkVersion));
@@ -203,9 +202,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard0Chunk1, shard1Chunk0});
auto opCtx = operationContext();
- std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {});
+ ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn(
+ opCtx, kNss, [&](OperationContext*, TxnNumber) {});
assertOnlyOneChunkVersionBumped(
operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion);
@@ -251,9 +249,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard0Chunk1, shard1Chunk0, shard1Chunk1});
auto opCtx = operationContext();
- std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {});
+ ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn(
+ opCtx, kNss, [&](OperationContext*, TxnNumber) {});
assertOnlyOneChunkVersionBumped(
operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion);
@@ -282,10 +279,9 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard1Chunk0});
size_t numCalls = 0;
- const std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
ShardingCatalogManager::get(operationContext())
- ->bumpCollShardVersionsAndChangeMetadataInTxn(
- operationContext(), kNss, shardIds, [&](OperationContext*, TxnNumber) {
+ ->bumpCollectionVersionAndChangeMetadataInTxn(
+ operationContext(), kNss, [&](OperationContext*, TxnNumber) {
++numCalls;
if (numCalls < 5) {
throw WriteConflictException();
@@ -313,8 +309,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
numCalls = 0;
ShardingCatalogManager::get(operationContext())
- ->bumpCollShardVersionsAndChangeMetadataInTxn(
- operationContext(), kNss, shardIds, [&](OperationContext*, TxnNumber) {
+ ->bumpCollectionVersionAndChangeMetadataInTxn(
+ operationContext(), kNss, [&](OperationContext*, TxnNumber) {
++numCalls;
if (numCalls >= 5) {
fp.reset();
@@ -354,12 +350,10 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard1Chunk0});
size_t numCalls = 0;
- const std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext())
- ->bumpCollShardVersionsAndChangeMetadataInTxn(
+ ->bumpCollectionVersionAndChangeMetadataInTxn(
operationContext(),
kNss,
- shardIds,
[&](OperationContext*, TxnNumber) {
++numCalls;
uasserted(ErrorCodes::ShutdownInProgress,
@@ -382,10 +376,9 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
numCalls = 0;
ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext())
- ->bumpCollShardVersionsAndChangeMetadataInTxn(
+ ->bumpCollectionVersionAndChangeMetadataInTxn(
operationContext(),
kNss,
- shardIds,
[&](OperationContext*, TxnNumber) {
++numCalls;
uasserted(ErrorCodes::NotWritablePrimary,
@@ -408,16 +401,15 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
numCalls = 0;
ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext())
- ->bumpCollShardVersionsAndChangeMetadataInTxn(
+ ->bumpCollectionVersionAndChangeMetadataInTxn(
operationContext(),
kNss,
- shardIds,
[&](OperationContext*, TxnNumber) {
++numCalls;
cc().getOperationContext()->markKilled(ErrorCodes::Interrupted);
// Throw a LockTimeout exception so
- // bumpCollShardVersionsAndChangeMetadataInTxn() makes another
+ // bumpCollectionVersionAndChangeMetadataInTxn() makes another
// retry attempt and discovers operation context has been killed.
uasserted(ErrorCodes::LockTimeout,
"simulating lock timeout error from test");
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index f4a1d7c5349..3544004dedb 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/distinct_command_gen.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/sharding_logging.h"
#include "mongo/db/s/sharding_util.h"
@@ -477,6 +478,50 @@ NamespaceStringOrUUID getNsOrUUIDForChunkTargeting(const CollectionType& coll) {
}
}
+std::vector<ShardId> getShardsOwningChunksForCollection(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ auto findCollResponse = uassertStatusOK(
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ CollectionType::ConfigNS,
+ BSON(CollectionType::kNssFieldName << nss.ns()),
+ {},
+ 1));
+ uassert(
+ ErrorCodes::Error(5514600), "Collection does not exist", !findCollResponse.docs.empty());
+ const CollectionType coll(findCollResponse.docs[0]);
+ const auto nsOrUUID = getNsOrUUIDForChunkTargeting(coll);
+
+ DistinctCommand distinctCmd(ChunkType::ConfigNS, ChunkType::shard.name());
+ if (nsOrUUID.uuid()) {
+ distinctCmd.setQuery(BSON(ChunkType::collectionUUID << *(nsOrUUID.uuid())));
+ } else {
+ distinctCmd.setQuery(BSON(ChunkType::ns(nsOrUUID.nss()->ns())));
+ }
+
+ const auto distinctResult = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ NamespaceString::kConfigDb.toString(),
+ distinctCmd.toBSON({}),
+ Shard::RetryPolicy::kIdempotent));
+ uassertStatusOK(distinctResult.commandStatus);
+
+ const auto valuesElem = distinctResult.response.getField("values");
+ std::vector<ShardId> shardIds;
+ for (const auto shard : valuesElem.Array()) {
+ shardIds.emplace_back(shard.String());
+ }
+ uassert(ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find shardIds owning chunks for collection '" << nss.ns()
+ << ", but found none",
+ !shardIds.empty());
+
+ return shardIds;
+}
+
} // namespace
StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit(
@@ -1353,19 +1398,38 @@ void ShardingCatalogManager::ensureChunkVersionIsGreaterThan(OperationContext* o
}
}
-void ShardingCatalogManager::bumpCollShardVersionsAndChangeMetadataInTxn(
+void ShardingCatalogManager::bumpCollectionVersionAndChangeMetadataInTxn(
OperationContext* opCtx,
const NamespaceString& nss,
- const std::vector<ShardId>& shardIds,
+ unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
+
+ bumpMultipleCollectionVersionsAndChangeMetadataInTxn(
+ opCtx, {nss}, std::move(changeMetadataFunc));
+}
+
+void ShardingCatalogManager::bumpMultipleCollectionVersionsAndChangeMetadataInTxn(
+ OperationContext* opCtx,
+ const std::vector<NamespaceString>& collNames,
unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
// Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
// migrations
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+
+ using NssAndShardIds = std::pair<NamespaceString, std::vector<ShardId>>;
+ std::vector<NssAndShardIds> nssAndShardIds;
+ for (const auto& nss : collNames) {
+ auto shardIds = getShardsOwningChunksForCollection(opCtx, nss);
+ nssAndShardIds.emplace_back(nss, std::move(shardIds));
+ }
+
withTransaction(opCtx,
NamespaceString::kConfigReshardingOperationsNamespace,
[&](OperationContext* opCtx, TxnNumber txnNumber) {
- bumpMajorVersionOneChunkPerShard(opCtx, nss, txnNumber, shardIds);
+ for (const auto& nssAndShardId : nssAndShardIds) {
+ bumpMajorVersionOneChunkPerShard(
+ opCtx, nssAndShardId.first, txnNumber, nssAndShardId.second);
+ }
changeMetadataFunc(opCtx, txnNumber);
});
}
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 7308d9dca45..88aa8f87930 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -503,46 +503,8 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
opCtx, TagsType::ConfigNS, tagsRequest, txnNumber);
}
-//
-// Helper methods for ensuring donors/ recipients are able to notice when certain state transitions
-// occur.
-//
-// Donors/recipients learn when to transition states by noticing a change in shard versions for one
-// of the two collections involved in the resharding operations.
-//
-// Before the resharding operation persists the decision whether to succeed or fail:
-// * Donors are notified when the original resharding collection's shard versions are incremented.
-// * Recipients are notified when the temporary resharding collection's shard versions are
-// incremented.
-//
-// After the resharding operation persists its decision:
-// * Both donors and recipients are notified when the original resharding collection's shard
-// versions are incremented.
-//
-
-/**
- * Maps which participants are to be notified when the coordinator transitions into a given state.
- */
-enum class ParticipantsToNotifyEnum {
- kDonors,
- kRecipients,
- kAllParticipantsPostDecisionPersisted,
- kNone
-};
-stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNotifyEnum> notifyForStateTransition{
- {CoordinatorStateEnum::kUnused, ParticipantsToNotifyEnum::kNone},
- {CoordinatorStateEnum::kInitializing, ParticipantsToNotifyEnum::kNone},
- {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNotifyEnum::kDonors},
- {CoordinatorStateEnum::kCloning, ParticipantsToNotifyEnum::kRecipients},
- {CoordinatorStateEnum::kApplying, ParticipantsToNotifyEnum::kDonors},
- {CoordinatorStateEnum::kBlockingWrites, ParticipantsToNotifyEnum::kDonors},
- {CoordinatorStateEnum::kDecisionPersisted, ParticipantsToNotifyEnum::kNone},
- {CoordinatorStateEnum::kDone, ParticipantsToNotifyEnum::kNone},
- {CoordinatorStateEnum::kError, ParticipantsToNotifyEnum::kDonors},
-};
-
/**
- * Executes metadata changes in a transaction.
+ * Executes metadata changes in a transaction without bumping the collection version.
*/
void executeMetadataChangesInTxn(
OperationContext* opCtx,
@@ -553,73 +515,6 @@ void executeMetadataChangesInTxn(
changeMetadataFunc(opCtx, txnNumber);
});
}
-
-/**
- * Runs resharding metadata changes in a transaction.
- *
- * This function should only be called if donor and recipient shards DO NOT need to be informed of
- * the updatedCoordinatorDoc's state transition. If donor or recipient shards need to be informed,
- * instead call bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn().
- */
-void executeStateTransitionAndMetadataChangesInTxn(
- OperationContext* opCtx,
- const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
- unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
- const auto& state = updatedCoordinatorDoc.getState();
- invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end());
- invariant(notifyForStateTransition[state] == ParticipantsToNotifyEnum::kNone);
-
- // Neither donors nor recipients need to be informed of the transition to
- // updatedCoordinatorDoc's state.
- executeMetadataChangesInTxn(opCtx, std::move(changeMetadataFunc));
-}
-
-/**
- * In a single transaction, bumps the shard version for each shard spanning the corresponding
- * resharding collection and executes changeMetadataFunc.
- *
- * This function should only be called if donor or recipient shards need to be informed of the
- * updatedCoordinatorDoc's state transition. If donor or recipient shards do not need to be
- * informed, instead call executeStateTransitionAndMetadataChangesInTxn().
- */
-void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
- OperationContext* opCtx,
- const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
- unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
- const auto& state = updatedCoordinatorDoc.getState();
- invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end());
- invariant(notifyForStateTransition[state] != ParticipantsToNotifyEnum::kNone);
-
- auto participantsToNotify = notifyForStateTransition[state];
- if (participantsToNotify == ParticipantsToNotifyEnum::kDonors) {
- // Bump the donor shard versions for the original namespace along with updating the
- // metadata.
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx,
- updatedCoordinatorDoc.getSourceNss(),
- resharding::extractShardIds(updatedCoordinatorDoc.getDonorShards()),
- std::move(changeMetadataFunc));
- } else if (participantsToNotify == ParticipantsToNotifyEnum::kRecipients) {
- // Bump the recipient shard versions for the temporary resharding namespace along with
- // updating the metadata.
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx,
- updatedCoordinatorDoc.getTempReshardingNss(),
- resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()),
- std::move(changeMetadataFunc));
- } else if (participantsToNotify ==
- ParticipantsToNotifyEnum::kAllParticipantsPostDecisionPersisted) {
- // Bump the recipient shard versions for the original resharding namespace along with
- // updating the metadata. Only the recipient shards will have chunks for the namespace after
- // the coordinator is in state kDecisionPersisted, bumping chunk versions on the donor
- // shards would not apply.
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx,
- updatedCoordinatorDoc.getSourceNss(),
- resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()),
- std::move(changeMetadataFunc));
- }
-}
} // namespace
namespace resharding {
@@ -653,29 +548,30 @@ void insertCoordDocAndChangeOrigCollEntry(OperationContext* opCtx,
opCtx, coordinatorDoc.getSourceNss(), repl::ReadConcernLevel::kMajorityReadConcern);
const auto collation = originalCollType.getDefaultCollation();
- executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) {
- // Insert the coordinator document to config.reshardingOperations.
- invariant(coordinatorDoc.getActive());
- try {
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
- auto extraInfo = ex.extraInfo<DuplicateKeyErrorInfo>();
- if (extraInfo->getKeyPattern().woCompare(BSON("active" << 1)) == 0) {
- uasserted(ErrorCodes::ReshardCollectionInProgress,
- str::stream()
- << "Only one resharding operation is allowed to be active at a "
- "time, aborting resharding op for "
- << coordinatorDoc.getSourceNss());
- }
+ ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn(
+ opCtx, coordinatorDoc.getSourceNss(), [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Insert the coordinator document to config.reshardingOperations.
+ invariant(coordinatorDoc.getActive());
+ try {
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
+ auto extraInfo = ex.extraInfo<DuplicateKeyErrorInfo>();
+ if (extraInfo->getKeyPattern().woCompare(BSON("active" << 1)) == 0) {
+ uasserted(ErrorCodes::ReshardCollectionInProgress,
+ str::stream()
+ << "Only one resharding operation is allowed to be active at a "
+ "time, aborting resharding op for "
+ << coordinatorDoc.getSourceNss());
+ }
- throw;
- }
+ throw;
+ }
- // Update the config.collections entry for the original collection to include
- // 'reshardingFields'
- updateConfigCollectionsForOriginalNss(
- opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
- });
+ // Update the config.collections entry for the original collection to include
+ // 'reshardingFields'
+ updateConfigCollectionsForOriginalNss(
+ opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+ });
}
ChunkVersion calculateChunkVersionForInitialChunks(OperationContext* opCtx) {
@@ -787,8 +683,10 @@ void writeParticipantShardsAndTempCollInfo(
const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
std::vector<ChunkType> initialChunks,
std::vector<BSONObj> zones) {
- bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
- opCtx, updatedCoordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn(
+ opCtx,
+ updatedCoordinatorDoc.getSourceNss(),
+ [&](OperationContext* opCtx, TxnNumber txnNumber) {
// Update on-disk state to reflect latest state transition.
writeToCoordinatorStateNss(opCtx, updatedCoordinatorDoc, txnNumber);
updateConfigCollectionsForOriginalNss(
@@ -808,61 +706,60 @@ void writeDecisionPersistedState(OperationContext* opCtx,
const ReshardingCoordinatorDocument& coordinatorDoc,
OID newCollectionEpoch,
boost::optional<Timestamp> newCollectionTimestamp) {
- executeStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
- // Update the config.reshardingOperations entry
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ // No need to bump originalNss version because its epoch will be changed.
+ executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Update the config.reshardingOperations entry
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- // Remove the config.collections entry for the temporary collection
- writeToConfigCollectionsForTempNss(
- opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+ // Remove the config.collections entry for the temporary collection
+ writeToConfigCollectionsForTempNss(
+ opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
- // Update the config.collections entry for the original namespace to reflect the new
- // shard key, new epoch, and new UUID
- updateConfigCollectionsForOriginalNss(
- opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
-
- // Remove all chunk and tag documents associated with the original collection, then
- // update the chunk and tag docs currently associated with the temp nss to be associated
- // with the original nss
- removeChunkAndTagsDocsForOriginalNss(
- opCtx, coordinatorDoc, newCollectionTimestamp, txnNumber);
- updateChunkAndTagsDocsForTempNss(
- opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
- });
+ // Update the config.collections entry for the original namespace to reflect the new
+ // shard key, new epoch, and new UUID
+ updateConfigCollectionsForOriginalNss(
+ opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
+
+ // Remove all chunk and tag documents associated with the original collection, then
+ // update the chunk and tag docs currently associated with the temp nss to be associated
+ // with the original nss
+ removeChunkAndTagsDocsForOriginalNss(
+ opCtx, coordinatorDoc, newCollectionTimestamp, txnNumber);
+ updateChunkAndTagsDocsForTempNss(
+ opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
+ });
}
void writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(
OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) {
// Run updates to config.reshardingOperations and config.collections in a transaction
auto nextState = coordinatorDoc.getState();
- invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end());
- invariant(notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone,
- "failed to write state transition with nextState {}"_format(
- CoordinatorState_serializer(nextState)));
- // Resharding metadata changes to be executed.
- auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) {
- // Update the config.reshardingOperations entry
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ std::vector<NamespaceString> collNames = {coordinatorDoc.getSourceNss()};
+ if (nextState < CoordinatorStateEnum::kDecisionPersisted ||
+ nextState == CoordinatorStateEnum::kError) {
+ collNames.emplace_back(coordinatorDoc.getTempReshardingNss());
+ }
- // Update the config.collections entry for the original collection
- updateConfigCollectionsForOriginalNss(
- opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+ ShardingCatalogManager::get(opCtx)->bumpMultipleCollectionVersionsAndChangeMetadataInTxn(
+ opCtx, collNames, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Update the config.reshardingOperations entry
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- // Update the config.collections entry for the temporary resharding collection. If we've
- // already persisted the decision that the operation will succeed, we've removed the entry
- // for the temporary collection and updated the entry with original namespace to have the
- // new shard key, UUID, and epoch
- if (nextState < CoordinatorStateEnum::kDecisionPersisted ||
- nextState == CoordinatorStateEnum::kError) {
- writeToConfigCollectionsForTempNss(
+ // Update the config.collections entry for the original collection
+ updateConfigCollectionsForOriginalNss(
opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
- }
- };
- bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, std::move(changeMetadataFunc));
+ // Update the config.collections entry for the temporary resharding collection. If we've
+ // already persisted the decision that the operation will succeed, we've removed the
+ // entry for the temporary collection and updated the entry with original namespace to
+ // have the new shard key, UUID, and epoch
+ if (nextState < CoordinatorStateEnum::kDecisionPersisted ||
+ nextState == CoordinatorStateEnum::kError) {
+ writeToConfigCollectionsForTempNss(
+ opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+ }
+ });
}
void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
@@ -872,8 +769,10 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDone);
- executeStateTransitionAndMetadataChangesInTxn(
- opCtx, updatedCoordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn(
+ opCtx,
+ updatedCoordinatorDoc.getSourceNss(),
+ [&](OperationContext* opCtx, TxnNumber txnNumber) {
// Remove entry for this resharding operation from config.reshardingOperations
writeToCoordinatorStateNss(opCtx, updatedCoordinatorDoc, txnNumber);
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
index 67450a78bed..577d8e65952 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
@@ -703,38 +703,52 @@ TEST_F(ReshardingCoordinatorPersistenceTest, WriteInitialInfoSucceeds) {
writeInitialStateAndCatalogUpdatesExpectSuccess(
operationContext(), expectedCoordinatorDoc, initialChunks, newZones);
- // Confirm the shard version was increased for the donor shard.
+ // Confirm the shard version was increased for the donor shard. The collection version was
+ // bumped twice in 'writeInitialStateAndCatalogUpdatesExpectSuccess': once when reshardingFields
+ // is inserted to the collection doc, and once again when the state transitions to
+ // kPreparingToDonate.
auto donorChunkPostTransition = getChunkDoc(operationContext(), donorChunk.getMin());
ASSERT_EQ(donorChunkPostTransition.getStatus(), Status::OK());
ASSERT_EQ(donorChunkPostTransition.getValue().getVersion().majorVersion(),
- collectionVersion.majorVersion() + 1);
+ collectionVersion.majorVersion() + 2);
}
TEST_F(ReshardingCoordinatorPersistenceTest, BasicStateTransitionSucceeds) {
auto coordinatorDoc =
insertStateAndCatalogEntries(CoordinatorStateEnum::kCloning, _originalEpoch);
- // Ensure the chunks for the original namespace exist since they will be bumped as a product of
- // the state transition to kPreparingToDonate.
+ // Ensure the chunks for the original and temporary namespaces exist since they will be bumped
+ // as a product of the state transition to kBlockingWrites.
auto donorChunk = makeAndInsertChunksForDonorShard(
_originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()});
- auto collectionVersion = donorChunk.getVersion();
+ auto donorCollectionVersion = donorChunk.getVersion();
+
+ auto recipientChunk = makeAndInsertChunksForRecipientShard(
+ _tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()});
+ auto recipientCollectionVersion = donorChunk.getVersion();
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
expectedCoordinatorDoc.setState(CoordinatorStateEnum::kBlockingWrites);
writeStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc);
- assertChunkVersionIncreasedAfterStateTransition(donorChunk, collectionVersion);
+ assertChunkVersionIncreasedAfterStateTransition(donorChunk, donorCollectionVersion);
+ assertChunkVersionIncreasedAfterStateTransition(recipientChunk, recipientCollectionVersion);
}
TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWithFetchTimestampSucceeds) {
auto coordinatorDoc =
insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch);
+ // Ensure the chunks for the original and temporary namespaces exist since they will be bumped
+ // as a product of the state transition to kCloning.
+ auto donorChunk = makeAndInsertChunksForDonorShard(
+ _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()});
+ auto donorCollectionVersion = donorChunk.getVersion();
+
auto recipientChunk = makeAndInsertChunksForRecipientShard(
_tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()});
- auto collectionVersion = recipientChunk.getVersion();
+ auto recipientCollectionVersion = recipientChunk.getVersion();
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
@@ -742,7 +756,8 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWithFetchTimestampSu
emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1));
writeStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc);
- assertChunkVersionIncreasedAfterStateTransition(recipientChunk, collectionVersion);
+ assertChunkVersionIncreasedAfterStateTransition(donorChunk, donorCollectionVersion);
+ assertChunkVersionIncreasedAfterStateTransition(recipientChunk, recipientCollectionVersion);
}
TEST_F(ReshardingCoordinatorPersistenceTest, StateTranstionToDecisionPersistedSucceeds) {
@@ -779,6 +794,11 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTranstionToDecisionPersistedSu
TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionToErrorSucceeds) {
auto coordinatorDoc =
insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch);
+ auto initialChunksIds = std::vector{OID::gen(), OID::gen()};
+
+ auto tempNssChunks = makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds);
+ auto recipientChunk = tempNssChunks[1];
+ insertChunkAndZoneEntries(tempNssChunks, makeZones(_tempNss, _newShardKey));
insertChunkAndZoneEntries(
makeChunks(_originalNss, OID::gen(), _oldShardKey, std::vector{OID::gen(), OID::gen()}),
@@ -797,7 +817,14 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionToDoneSucceeds) {
auto coordinatorDoc =
insertStateAndCatalogEntries(CoordinatorStateEnum::kDecisionPersisted, _finalEpoch);
+ // Ensure the chunks for the original namespace exist since they will be bumped as a product of
+ // the state transition to kDone.
+ auto finalChunk = makeAndInsertChunksForRecipientShard(
+ _originalNss, _finalEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()});
+ auto collectionVersion = finalChunk.getVersion();
+
removeCoordinatorDocAndReshardingFieldsExpectSuccess(operationContext(), coordinatorDoc);
+ assertChunkVersionIncreasedAfterStateTransition(finalChunk, collectionVersion);
}
TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWhenCoordinatorDocDoesNotExistFails) {
@@ -807,7 +834,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWhenCoordinatorDocDo
ASSERT_THROWS_CODE(resharding::writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(
operationContext(), coordinatorDoc),
AssertionException,
- 5057701);
+ 5514600);
}
TEST_F(ReshardingCoordinatorPersistenceTest,
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index 11e34c57239..f9e32bf6329 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -291,7 +291,7 @@ public:
bool sameReshardingFields(const RoutingTableHistory& other) const {
if (_reshardingFields && other._reshardingFields) {
- return _reshardingFields->toBSON().woCompare(other._reshardingFields->toBSON());
+ return _reshardingFields->toBSON().woCompare(other._reshardingFields->toBSON()) == 0;
} else {
return !_reshardingFields && !other._reshardingFields;
}