diff options
9 files changed, 323 insertions, 200 deletions
diff --git a/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js b/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js index f546aa8bee5..307a8077fb0 100644 --- a/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js +++ b/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js @@ -27,6 +27,20 @@ const st = new ShardingTest({ other: {mongosOptions: {binVersion: "latest"}} }); +const isFeatureFlagEnabled = + assert + .commandWorked(st.configRS.getPrimary().adminCommand( + {getParameter: 1, featureFlagShardingFullDDLSupportTimestampedVersion: 1})) + .featureFlagShardingFullDDLSupportTimestampedVersion.value; + +if (isFeatureFlagEnabled) { + // TODO SERVER-53104: do not skip this test once this ticket is completed + jsTest.log( + 'Skipping test since it is not compatible with featureFlagShardingFullDDLSupportTimestampedVersion yet'); + st.stop(); + return; +} + let shardedColl = st.s.getDB(dbName)[collName]; assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); st.ensurePrimaryShard(dbName, st.shard0.shardName); diff --git a/jstests/multiVersion/upgrade_downgrade_sharded_cluster.js b/jstests/multiVersion/upgrade_downgrade_sharded_cluster.js index 5f5c93b5c94..bbe554d9de4 100644 --- a/jstests/multiVersion/upgrade_downgrade_sharded_cluster.js +++ b/jstests/multiVersion/upgrade_downgrade_sharded_cluster.js @@ -259,9 +259,8 @@ function runChecksAfterFCVDowngrade() { {getParameter: 1, featureFlagShardingFullDDLSupportTimestampedVersion: 1})) .featureFlagShardingFullDDLSupportTimestampedVersion.value; - testAllowedMigrationsFieldChecksAfterFCVDowngrade(); - if (isFeatureFlagEnabled) { + testAllowedMigrationsFieldChecksAfterFCVDowngrade(); testTimestampFieldChecksAfterFCVDowngrade(); testChunkCollectionUuidFieldChecksAfterFCVDowngrade(); } else { @@ -295,6 +294,21 @@ for (let oldVersion of [lastLTSFCV, lastContinuousFCV]) { // Upgrade the entire cluster to the latest version. jsTest.log('upgrading cluster'); st.upgradeCluster(latestFCV); + + const isFeatureFlagEnabled = + assert + .commandWorked(st.configRS.getPrimary().adminCommand( + {getParameter: 1, featureFlagShardingFullDDLSupportTimestampedVersion: 1})) + .featureFlagShardingFullDDLSupportTimestampedVersion.value; + + if (isFeatureFlagEnabled) { + // TODO SERVER-53104: do not skip this test once this ticket is completed + jsTest.log( + 'Skipping test since it is not compatible with featureFlagShardingFullDDLSupportTimestampedVersion yet'); + st.stop(); + return; + } + assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: latestFCV})); // Tests after upgrade diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index 70a3c038e3f..a2c478ef871 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -56,7 +56,6 @@ #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/migration_util.h" -#include "mongo/db/s/shard_metadata_util.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/views/view_catalog.h" @@ -423,14 +422,6 @@ public: if (failDowngrading.shouldFail()) return false; - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (requestedVersion < FeatureCompatibilityParams::Version::kVersion49) { - // SERVER-52632: Remove once 5.0 becomes the LastLTS - shardmetadatautil::downgradeShardConfigDatabasesEntriesToPre49(opCtx); - shardmetadatautil::downgradeShardConfigCollectionEntriesToPre49(opCtx); - } - } - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { // Downgrade shards before config finishes its downgrade. uassertStatusOK( diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp index dade37ea0e5..37e78540c23 100644 --- a/src/mongo/db/s/shard_metadata_util.cpp +++ b/src/mongo/db/s/shard_metadata_util.cpp @@ -401,6 +401,26 @@ Status updateShardChunks(OperationContext* opCtx, } } +void updateTimestampOnShardCollections(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<Timestamp>& timestamp) { + write_ops::Update clearFields(NamespaceString::kShardConfigCollectionsNamespace, [&] { + write_ops::UpdateOpEntry u; + u.setQ(BSON(ShardCollectionType::kNssFieldName << nss.ns())); + BSONObj updateOp = (timestamp) + ? BSON("$set" << BSON(CollectionType::kTimestampFieldName << *timestamp)) + : BSON("$unset" << BSON(ShardCollectionType::kPre50CompatibleAllowMigrationsFieldName + << "" << CollectionType::kTimestampFieldName << "")); + u.setU(write_ops::UpdateModification::parseFromClassicUpdate(updateOp)); + return std::vector{u}; + }()); + + DBDirectClient client(opCtx); + const auto commandResult = client.runCommand(clearFields.serialize({})); + + uassertStatusOK(getStatusFromWriteCommandResponse(commandResult->getCommandReply())); +} + Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, const NamespaceString& nss) { try { DBDirectClient client(opCtx); @@ -487,65 +507,5 @@ Status deleteDatabasesEntry(OperationContext* opCtx, StringData dbName) { } } -void downgradeShardConfigDatabasesEntriesToPre49(OperationContext* opCtx) { - LOGV2(5258804, "Starting downgrade of config.cache.databases"); - if (feature_flags::gShardingFullDDLSupportTimestampedVersion.isEnabledAndIgnoreFCV()) { - write_ops::Update clearFields(NamespaceString::kShardConfigDatabasesNamespace, [] { - write_ops::UpdateOpEntry u; - u.setQ({}); - u.setU(write_ops::UpdateModification::parseFromClassicUpdate( - BSON("$unset" << BSON( - ShardDatabaseType::version() + "." + DatabaseVersion::kTimestampFieldName - << "")))); - u.setMulti(true); - return std::vector{u}; - }()); - - clearFields.setWriteCommandBase([] { - write_ops::WriteCommandBase base; - base.setOrdered(false); - return base; - }()); - - DBDirectClient client(opCtx); - const auto commandResult = client.runCommand(clearFields.serialize({})); - - uassertStatusOK(getStatusFromWriteCommandResponse(commandResult->getCommandReply())); - LOGV2(5258805, "Successfully downgraded config.cache.databases"); - } -} - -void downgradeShardConfigCollectionEntriesToPre49(OperationContext* opCtx) { - // Clear the 'allowMigrations' and 'timestamp' fields from config.cache.collections - LOGV2(5189100, "Starting downgrade of config.cache.collections"); - write_ops::Update clearFields(NamespaceString::kShardConfigCollectionsNamespace, [] { - BSONObj unsetFields = - BSON(ShardCollectionType::kPre50CompatibleAllowMigrationsFieldName << ""); - if (feature_flags::gShardingFullDDLSupportTimestampedVersion.isEnabledAndIgnoreFCV()) { - unsetFields = unsetFields.addFields(BSON(CollectionType::kTimestampFieldName << "")); - } - - write_ops::UpdateOpEntry u; - u.setQ({}); - u.setU( - write_ops::UpdateModification::parseFromClassicUpdate(BSON("$unset" << unsetFields))); - u.setMulti(true); - return std::vector{u}; - }()); - - clearFields.setWriteCommandBase([] { - write_ops::WriteCommandBase base; - base.setOrdered(false); - return base; - }()); - - DBDirectClient client(opCtx); - const auto commandResult = client.runCommand(clearFields.serialize({})); - - uassertStatusOK(getStatusFromWriteCommandResponse(commandResult->getCommandReply())); - - LOGV2(5189101, "Successfully downgraded config.cache.collections"); -} - } // namespace shardmetadatautil } // namespace mongo diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h index 45bee780dd0..8d7315e7e6e 100644 --- a/src/mongo/db/s/shard_metadata_util.h +++ b/src/mongo/db/s/shard_metadata_util.h @@ -202,6 +202,13 @@ Status updateShardChunks(OperationContext* opCtx, const OID& currEpoch); /** + * Adds/removes the timestamp of the 'nss' entry in config.cache.collections + */ +void updateTimestampOnShardCollections(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<Timestamp>& timestamp); + +/** * Deletes locally persisted chunk metadata associated with 'nss': drops the chunks collection * and removes the collections collection entry. * @@ -222,19 +229,5 @@ void dropChunks(OperationContext* opCtx, const NamespaceString& nss); */ Status deleteDatabasesEntry(OperationContext* opCtx, StringData dbName); -/** - * Downgrades the config.cache.databases entries to prior 4.9 version. More specifically, it removes - * the 'version.timestamp' field from all the documents in config.cache.databases. - * - */ -void downgradeShardConfigDatabasesEntriesToPre49(OperationContext* opCtx); - -/** - * Downgrades the config.cache.collections entries to prior 4.9 version. More specifically, it - * removes the allowMigrations and timestamp fields from all the documents of - * config.cache.collections - */ -void downgradeShardConfigCollectionEntriesToPre49(OperationContext* opCtx); - } // namespace shardmetadatautil } // namespace mongo diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp index d20ba3be849..4890a09a5d9 100644 --- a/src/mongo/db/s/shard_metadata_util_test.cpp +++ b/src/mongo/db/s/shard_metadata_util_test.cpp @@ -326,35 +326,5 @@ TEST_F(ShardMetadataUtilTest, DropChunksAndDeleteCollectionsEntry) { checkCollectionIsEmpty(NamespaceString::kShardConfigCollectionsNamespace); } -TEST_F(ShardMetadataUtilTest, DowngradeShardConfigCollectionEntriesTo44) { - setUpShardChunkMetadata(); - - const auto checkShardCollectionAllowMigrationsFlag = [&](bool expectedValue) { - ShardCollectionType readShardCollectionType = - assertGet(readShardCollectionsEntry(operationContext(), kNss)); - - ASSERT_EQUALS(readShardCollectionType.getAllowMigrations(), expectedValue); - }; - - // allowMigrations is an optional field. If it is not present -> allowMigrations = true - checkShardCollectionAllowMigrationsFlag(/* expectedValue */ true); - - ASSERT_OK(updateShardCollectionsEntry( - operationContext(), - BSON(ShardCollectionType::kNssFieldName << kNss.ns()), - BSON(ShardCollectionType::kPre50CompatibleAllowMigrationsFieldName << false), - BSONObj(), - /* upsert */ false)); - - // allowMigrations was explicitly defined to false by the previous statement - checkShardCollectionAllowMigrationsFlag(/* expectedValue */ false); - - downgradeShardConfigCollectionEntriesToPre49(operationContext()); - - // The downgrade process to prior 4.9 removes the allowMigration flag from all collections. - // Thus, this flag will not be present which implies that its value is true - checkShardCollectionAllowMigrationsFlag(/* expectedValue */ true); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 20132ca3f8e..aa82c7d916c 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -227,8 +227,12 @@ CollectionAndChangedChunks getPersistedMetadataSinceVersion(OperationContext* op uassertStatusOK(readShardCollectionsEntry(opCtx, nss)); // If the persisted epoch doesn't match what the CatalogCache requested, read everything. + // If the epochs are the same we can safely take the timestamp from the shard coll entry. ChunkVersion startingVersion = (shardCollectionEntry.getEpoch() == version.epoch()) - ? version + ? ChunkVersion(version.majorVersion(), + version.minorVersion(), + version.epoch(), + shardCollectionEntry.getTimestamp()) : ChunkVersion(0, 0, shardCollectionEntry.getEpoch(), shardCollectionEntry.getTimestamp()); QueryAndSort diff = createShardChunkDiffQuery(startingVersion); @@ -364,6 +368,22 @@ ChunkVersion getLocalVersion(OperationContext* opCtx, const NamespaceString& nss return uassertStatusOK(std::move(swRefreshState)).lastRefreshedCollectionVersion; } +void patchUpChangedChunksIfNeeded(bool mustPatchUpMetadataResults, + CollectionAndChangedChunks& collAndChunks) { + if (!mustPatchUpMetadataResults) + return; + + const boost::optional<Timestamp> newTimestamp = collAndChunks.creationTime; + std::for_each( + collAndChunks.changedChunks.begin(), + collAndChunks.changedChunks.end(), + [&newTimestamp](ChunkType& chunk) { + const ChunkVersion version = chunk.getVersion(); + chunk.setVersion(ChunkVersion( + version.majorVersion(), version.minorVersion(), version.epoch(), newTimestamp)); + }); +} + } // namespace ShardServerCatalogCacheLoader::ShardServerCatalogCacheLoader( @@ -661,7 +681,7 @@ ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( _ensureMajorityPrimaryAndScheduleCollAndChunksTask( opCtx, nss, - collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); + CollAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); LOGV2_FOR_CATALOG_REFRESH( 24107, @@ -690,12 +710,36 @@ ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( << "'."}; } + + if (maxLoaderVersion.isSet() && + (maxLoaderVersion.getTimestamp().is_initialized() != + collAndChunks.creationTime.is_initialized())) { + // This task will update the metadata format of the collection and all its chunks. + // It doesn't apply the changes of the ChangedChunks, we will do that in the next task + _ensureMajorityPrimaryAndScheduleCollAndChunksTask( + opCtx, + nss, + CollAndChunkTask{swCollectionAndChangedChunks, + maxLoaderVersion, + termScheduled, + true /* metadataFormatChanged */}); + + LOGV2_FOR_CATALOG_REFRESH(5310400, + 1, + "Cache loader update metadata format for collection {namespace}" + "{oldTimestamp} and {newTimestamp}", + "Cache loader update metadata format for collection", + "namespace"_attr = nss, + "oldTimestamp"_attr = maxLoaderVersion.getTimestamp(), + "newTimestamp"_attr = collAndChunks.creationTime); + } + if ((collAndChunks.epoch != maxLoaderVersion.epoch()) || maxLoaderVersion.isOlderThan(collAndChunks.changedChunks.back().getVersion())) { _ensureMajorityPrimaryAndScheduleCollAndChunksTask( opCtx, nss, - collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); + CollAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); } LOGV2_FOR_CATALOG_REFRESH( @@ -788,22 +832,24 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader // Get the enqueued metadata first. Otherwise we could miss data between reading persisted and // enqueued, if an enqueued task finished after the persisted read but before the enqueued read. - auto enqueuedRes = _getEnqueuedMetadata(nss, catalogCacheSinceVersion, expectedTerm); - bool tasksAreEnqueued = std::move(enqueuedRes.first); - CollectionAndChangedChunks enqueued = std::move(enqueuedRes.second); + auto [tasksAreEnqueued, enqueuedMetadata] = + _getEnqueuedMetadata(nss, catalogCacheSinceVersion, expectedTerm); + CollectionAndChangedChunks& enqueued = enqueuedMetadata.collAndChangedChunks; auto swPersisted = getIncompletePersistedMetadataSinceVersion(opCtx, nss, catalogCacheSinceVersion); CollectionAndChangedChunks persisted; if (swPersisted == ErrorCodes::NamespaceNotFound) { - // No persisted metadata found, create an empty object. - persisted = CollectionAndChangedChunks(); + // No persisted metadata found } else if (!swPersisted.isOK()) { return swPersisted; } else { persisted = std::move(swPersisted.getValue()); } + bool lastTaskIsADrop = tasksAreEnqueued && enqueued.changedChunks.empty() && + !enqueuedMetadata.mustPatchUpMetadataResults; + LOGV2_FOR_CATALOG_REFRESH( 24111, 1, @@ -812,7 +858,10 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader "Cache loader state since the latest cached version", "enqueuedTasksDesc"_attr = (enqueued.changedChunks.empty() - ? (tasksAreEnqueued ? "a drop enqueued" : "no enqueued metadata") + ? (tasksAreEnqueued + ? (lastTaskIsADrop ? "a drop is enqueued" + : "an update of the metadata format is enqueued") + : "no enqueued metadata") : ("enqueued metadata from " + enqueued.changedChunks.front().getVersion().toString() + " to " + enqueued.changedChunks.back().getVersion().toString())), @@ -827,17 +876,18 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader if (!tasksAreEnqueued) { // There are no tasks in the queue. Return the persisted metadata. return persisted; - } else if (persisted.changedChunks.empty() || enqueued.changedChunks.empty() || + } else if (persisted.changedChunks.empty() || lastTaskIsADrop || enqueued.epoch != persisted.epoch) { - // There is a task queue and: - // - nothing is persisted. - // - nothing was returned from enqueued, which means the last task enqueued is a drop task. - // - the epoch changed in the enqueued metadata, which means there's a drop operation - // enqueued somewhere. + // There is a task in the queue and: + // - nothing is persisted, OR + // - the last task in the queue was a drop, OR + // - the epoch changed in the enqueued metadata. // Whichever the cause, the persisted metadata is out-dated/non-existent. Return enqueued // results. + patchUpChangedChunksIfNeeded(enqueuedMetadata.mustPatchUpMetadataResults, enqueued); return enqueued; } else { + // There can be overlap between persisted and enqueued metadata because enqueued work can // be applied while persisted was read. We must remove this overlap. @@ -857,6 +907,13 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader persisted.changedChunks.insert(persisted.changedChunks.end(), enqueued.changedChunks.begin(), enqueued.changedChunks.end()); + + // We may need to patch up the changed chunks because there was a metadata format change + patchUpChangedChunksIfNeeded(enqueuedMetadata.mustPatchUpMetadataResults, persisted); + + + // The collection info in enqueued metadata may be more recent than the persited metadata + persisted.creationTime = enqueued.creationTime; persisted.reshardingFields = std::move(enqueued.reshardingFields); persisted.allowMigrations = enqueued.allowMigrations; @@ -864,29 +921,30 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader } } -std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getEnqueuedMetadata( - const NamespaceString& nss, - const ChunkVersion& catalogCacheSinceVersion, - const long long term) { +std::pair<bool, ShardServerCatalogCacheLoader::EnqueuedMetadataResults> +ShardServerCatalogCacheLoader::_getEnqueuedMetadata(const NamespaceString& nss, + const ChunkVersion& catalogCacheSinceVersion, + const long long term) { stdx::unique_lock<Latch> lock(_mutex); auto taskListIt = _collAndChunkTaskLists.find(nss); if (taskListIt == _collAndChunkTaskLists.end()) { - return std::make_pair(false, CollectionAndChangedChunks()); + return std::make_pair(false, EnqueuedMetadataResults()); } else if (!taskListIt->second.hasTasksFromThisTerm(term)) { // If task list does not have a term that matches, there's no valid task data to collect. - return std::make_pair(false, CollectionAndChangedChunks()); + return std::make_pair(false, EnqueuedMetadataResults()); } // Only return task data of tasks scheduled in the same term as the given 'term': older term // task data is no longer valid. - CollectionAndChangedChunks collAndChunks = taskListIt->second.getEnqueuedMetadataForTerm(term); + EnqueuedMetadataResults enqueuedMetadata = taskListIt->second.getEnqueuedMetadataForTerm(term); + CollectionAndChangedChunks& collAndChunks = enqueuedMetadata.collAndChangedChunks; // Return all the results if 'catalogCacheSinceVersion's epoch does not match. Otherwise, trim // the results to be GTE to 'catalogCacheSinceVersion'. if (collAndChunks.epoch != catalogCacheSinceVersion.epoch()) { - return std::make_pair(true, collAndChunks); + return std::make_pair(true, std::move(enqueuedMetadata)); } auto changedChunksIt = collAndChunks.changedChunks.begin(); @@ -896,11 +954,11 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE } collAndChunks.changedChunks.erase(collAndChunks.changedChunks.begin(), changedChunksIt); - return std::make_pair(true, collAndChunks); + return std::make_pair(true, std::move(enqueuedMetadata)); } void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChunksTask( - OperationContext* opCtx, const NamespaceString& nss, collAndChunkTask task) { + OperationContext* opCtx, const NamespaceString& nss, CollAndChunkTask task) { { stdx::lock_guard<Latch> lock(_mutex); @@ -1107,8 +1165,9 @@ void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata( OperationContext* opCtx, const NamespaceString& nss) { stdx::unique_lock<Latch> lock(_mutex); - const collAndChunkTask& task = _collAndChunkTaskLists[nss].front(); - invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty()); + const CollAndChunkTask& task = _collAndChunkTaskLists[nss].front(); + invariant(task.dropped || task.updateMetadataFormat || + !task.collectionAndChangedChunks->changedChunks.empty()); // If this task is from an old term and no longer valid, do not execute and return true so that // the task gets removed from the task list @@ -1128,6 +1187,12 @@ void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata( return; } + if (task.updateMetadataFormat) { + updateTimestampOnShardCollections( + opCtx, nss, task.collectionAndChangedChunks->creationTime); + return; + } + uassertStatusOKWithContext( persistCollectionAndChangedChunks( opCtx, nss, *task.collectionAndChangedChunks, task.minQueryVersion), @@ -1225,17 +1290,35 @@ ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVer } } -ShardServerCatalogCacheLoader::collAndChunkTask::collAndChunkTask( +ShardServerCatalogCacheLoader::CollAndChunkTask::CollAndChunkTask( StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, ChunkVersion minimumQueryVersion, - long long currentTerm) + long long currentTerm, + bool metadataFormatChanged) : taskNum(taskIdGenerator.fetchAndAdd(1)), - minQueryVersion(minimumQueryVersion), + minQueryVersion(std::move(minimumQueryVersion)), + updateMetadataFormat(metadataFormatChanged), termCreated(currentTerm) { if (statusWithCollectionAndChangedChunks.isOK()) { - collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue(); - invariant(!collectionAndChangedChunks->changedChunks.empty()); - maxQueryVersion = collectionAndChangedChunks->changedChunks.back().getVersion(); + if (updateMetadataFormat) { + // An update metadata format doesn't handle the ChangedChunks: if + // there are any and they are newer than what we have locally, they + // will be handled by another task. Despite of that, we still need + // a valid CollectionAndChangedChunks object to get the information + // of the collection. + collectionAndChangedChunks = std::move(statusWithCollectionAndChangedChunks.getValue()); + collectionAndChangedChunks->changedChunks.clear(); + // Propagating the Timestamp to the maxQueryVersion, so 'getHighestVersionEnqueued' + // returns a version with the new format. + maxQueryVersion = ChunkVersion(minQueryVersion.majorVersion(), + minQueryVersion.minorVersion(), + minQueryVersion.epoch(), + collectionAndChangedChunks->creationTime); + } else { + collectionAndChangedChunks = std::move(statusWithCollectionAndChangedChunks.getValue()); + invariant(!collectionAndChangedChunks->changedChunks.empty()); + maxQueryVersion = collectionAndChangedChunks->changedChunks.back().getVersion(); + } } else { invariant(statusWithCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound); dropped = true; @@ -1259,7 +1342,7 @@ ShardServerCatalogCacheLoader::CollAndChunkTaskList::CollAndChunkTaskList() ShardServerCatalogCacheLoader::DbTaskList::DbTaskList() : _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {} -void ShardServerCatalogCacheLoader::CollAndChunkTaskList::addTask(collAndChunkTask task) { +void ShardServerCatalogCacheLoader::CollAndChunkTaskList::addTask(CollAndChunkTask task) { if (_tasks.empty()) { _tasks.emplace_back(std::move(task)); return; @@ -1343,21 +1426,19 @@ bool ShardServerCatalogCacheLoader::CollAndChunkTaskList::hasTasksFromThisTerm( return _tasks.back().termCreated == term; } -bool ShardServerCatalogCacheLoader::DbTaskList::hasTasksFromThisTerm(long long term) const { - invariant(!_tasks.empty()); - return _tasks.back().termCreated == term; -} - ChunkVersion ShardServerCatalogCacheLoader::CollAndChunkTaskList::getHighestVersionEnqueued() const { invariant(!_tasks.empty()); return _tasks.back().maxQueryVersion; } -CollectionAndChangedChunks +ShardServerCatalogCacheLoader::EnqueuedMetadataResults ShardServerCatalogCacheLoader::CollAndChunkTaskList::getEnqueuedMetadataForTerm( const long long term) const { - CollectionAndChangedChunks collAndChunks; + EnqueuedMetadataResults enqueuedMetadata; + + CollectionAndChangedChunks& collAndChunks = enqueuedMetadata.collAndChangedChunks; + bool& mustPatchUpMetadataResults = enqueuedMetadata.mustPatchUpMetadataResults; for (const auto& task : _tasks) { if (task.termCreated != term) { // Task data is no longer valid. Go on to the next task in the list. @@ -1365,37 +1446,49 @@ ShardServerCatalogCacheLoader::CollAndChunkTaskList::getEnqueuedMetadataForTerm( } if (task.dropped) { - // A drop task should reset the metadata. + // The current task is a drop -> the aggregated results aren't interesting so we + // overwrite the CollAndChangedChunks and unset the flag collAndChunks = CollectionAndChangedChunks(); + mustPatchUpMetadataResults = false; + } else if (task.collectionAndChangedChunks->epoch != collAndChunks.epoch) { + // The current task has a new epoch (refine shard key or resharding op) -> the + // aggregated results aren't interesting so we overwrite them with the current + // CollAndChangedChunks and unset the flag + collAndChunks = *task.collectionAndChangedChunks; + mustPatchUpMetadataResults = false; + } else if (task.updateMetadataFormat) { + // The current task is an update task -> we only update the Timestamp of the aggregated + // results and set the flag + collAndChunks.creationTime = task.collectionAndChangedChunks->creationTime; + mustPatchUpMetadataResults = true; } else { - if (task.collectionAndChangedChunks->epoch != collAndChunks.epoch) { - // An epoch change should reset the metadata and start from the new. - collAndChunks = *task.collectionAndChangedChunks; - } else { - // Epochs match, so the new results should be appended. - - // Make sure we do not append a duplicate chunk. The diff query is GTE, so there can - // be duplicates of the same exact versioned chunk across tasks. This is no problem - // for our diff application algorithms, but it can return unpredictable numbers of - // chunks for testing purposes. Eliminate unpredicatable duplicates for testing - // stability. - auto taskCollectionAndChangedChunksIt = - task.collectionAndChangedChunks->changedChunks.begin(); - if (collAndChunks.changedChunks.back().getVersion() == - task.collectionAndChangedChunks->changedChunks.front().getVersion()) { - ++taskCollectionAndChangedChunksIt; - } + // The current task is not a drop neither an update and the epochs match -> we add its + // results to the aggregated results + + // Make sure we do not append a duplicate chunk. The diff query is GTE, so there can + // be duplicates of the same exact versioned chunk across tasks. This is no problem + // for our diff application algorithms, but it can return unpredictable numbers of + // chunks for testing purposes. Eliminate unpredicatable duplicates for testing + // stability. + auto taskCollectionAndChangedChunksIt = + task.collectionAndChangedChunks->changedChunks.begin(); + if (!collAndChunks.changedChunks.empty() && + collAndChunks.changedChunks.back().getVersion() == + taskCollectionAndChangedChunksIt->getVersion()) { + ++taskCollectionAndChangedChunksIt; + } - collAndChunks.changedChunks.insert( - collAndChunks.changedChunks.end(), - taskCollectionAndChangedChunksIt, - task.collectionAndChangedChunks->changedChunks.end()); + collAndChunks.changedChunks.insert( + collAndChunks.changedChunks.end(), + taskCollectionAndChangedChunksIt, + task.collectionAndChangedChunks->changedChunks.end()); - collAndChunks.reshardingFields = task.collectionAndChangedChunks->reshardingFields; - } + // Keep the most recent version of these fields + collAndChunks.allowMigrations = task.collectionAndChangedChunks->allowMigrations; + collAndChunks.reshardingFields = task.collectionAndChangedChunks->reshardingFields; } } - return collAndChunks; + return enqueuedMetadata; } } // namespace mongo diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index e507063920d..b729dd2ae1c 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -96,10 +96,10 @@ private: * apply a set up updated chunks to the shard persisted metadata store or to drop the persisted * metadata for a specific collection. */ - struct collAndChunkTask { - collAndChunkTask(const collAndChunkTask&) = delete; - collAndChunkTask& operator=(const collAndChunkTask&) = delete; - collAndChunkTask(collAndChunkTask&&) = default; + struct CollAndChunkTask { + CollAndChunkTask(const CollAndChunkTask&) = delete; + CollAndChunkTask& operator=(const CollAndChunkTask&) = delete; + CollAndChunkTask(CollAndChunkTask&&) = default; /** * Initializes a task for either dropping or updating the persisted metadata for the @@ -109,15 +109,20 @@ private: * Note: statusWithCollectionAndChangedChunks must always be NamespaceNotFound or * OK, otherwise the constructor will invariant because there is no task to complete. * + * if 'metadataFormatChanged' is true, this task updates the persistent + * metadata format of the collection and its chunks. This specific kind + * of task doesn't have changed chunks. + * * 'collectionAndChangedChunks' is only initialized if 'dropped' is false. * 'minimumQueryVersion' sets 'minQueryVersion'. * 'maxQueryVersion' is either set to the highest chunk version in * 'collectionAndChangedChunks' or ChunkVersion::UNSHARDED(). */ - collAndChunkTask( + CollAndChunkTask( StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, ChunkVersion minimumQueryVersion, - long long currentTerm); + long long currentTerm, + bool metadataFormatChanged = false); // Always-incrementing task number to uniquely identify different tasks uint64_t taskNum; @@ -139,10 +144,24 @@ private: // Indicates whether the collection metadata must be cleared. bool dropped{false}; + // Indicates whether the collection metadata and all its chunks must be updated due to a + // metadata format change. + bool updateMetadataFormat{false}; + // The term in which the loader scheduled this task. uint32_t termCreated; }; + /* This class represents the results of a _getEnqueuedMetadata call. It contains information + * about: + * - Whether we must patch up the metadata results that are sent back to the CatalogCache. + * - The Collection and the changed chunks. + */ + struct EnqueuedMetadataResults { + bool mustPatchUpMetadataResults{false}; + CollectionAndChangedChunks collAndChangedChunks; + }; + /** * A list (work queue) of updates to apply to the shard persisted metadata store for a specific * collection. Enforces that tasks that are added to the list are either consistent: @@ -164,7 +183,7 @@ private: * don't waste time applying changes we will just delete. If the one remaining task in the * list is already a drop task, the new one isn't added because it is redundant. */ - void addTask(collAndChunkTask task); + void addTask(CollAndChunkTask task); auto& front() { invariant(!_tasks.empty()); @@ -218,10 +237,11 @@ private: * Iterates over the task list to retrieve the enqueued metadata. Only retrieves collects * data from tasks that have terms matching the specified 'term'. */ - CollectionAndChangedChunks getEnqueuedMetadataForTerm(const long long term) const; + EnqueuedMetadataResults getEnqueuedMetadataForTerm(const long long term) const; + private: - std::list<collAndChunkTask> _tasks{}; + std::list<CollAndChunkTask> _tasks{}; // Condition variable which will be signaled whenever the active task from the tasks list is // completed. Must be used in conjunction with the loader's mutex. @@ -312,13 +332,6 @@ private: */ void waitForActiveTaskCompletion(stdx::unique_lock<Latch>& lg); - /** - * Checks whether 'term' matches the term of the latest task in the task list. This is - * useful to check whether the task list has outdated data that's no longer valid to use in - * the current/new term specified by 'term'. - */ - bool hasTasksFromThisTerm(long long term) const; - private: std::list<DBTask> _tasks{}; @@ -406,7 +419,7 @@ private: * * Only run on the shard primary. */ - std::pair<bool, CollectionAndChangedChunks> _getEnqueuedMetadata( + std::pair<bool, EnqueuedMetadataResults> _getEnqueuedMetadata( const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, const long long term); @@ -423,7 +436,7 @@ private: */ void _ensureMajorityPrimaryAndScheduleCollAndChunksTask(OperationContext* opCtx, const NamespaceString& nss, - collAndChunkTask task); + CollAndChunkTask task); void _ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx, StringData dbName, diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp index fbe67ed204f..026348146dd 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp @@ -29,10 +29,13 @@ #include "mongo/platform/basic.h" +#include <boost/optional/optional_io.hpp> + #include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache_loader_mock.h" namespace mongo { @@ -186,7 +189,8 @@ ShardServerCatalogCacheLoaderTest::makeCombinedOriginalFiveChunksAndThreeNewChun CollectionType ShardServerCatalogCacheLoaderTest::makeCollectionType( const ChunkVersion& collVersion) { - CollectionType coll(kNss, collVersion.epoch(), Date_t::now(), UUID::gen()); + CollectionType coll( + kNss, collVersion.epoch(), collVersion.getTimestamp(), Date_t::now(), UUID::gen()); coll.setKeyPattern(kKeyPattern); coll.setUnique(false); return coll; @@ -431,5 +435,76 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindMixedChun } } +TEST_F(ShardServerCatalogCacheLoaderTest, + PrimaryLoadFromShardedAndFindCollAndChunksMetadataFormatChanged) { + // First set up the shard chunk loader as sharded. + vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks(); + + // Simulating that the config server added timestamps to all chunks + { + vector<ChunkType> newChunks = chunks; + for (auto& chunk : newChunks) { + const ChunkVersion v = chunk.getVersion(); + chunk.setVersion( + ChunkVersion(v.majorVersion(), v.minorVersion(), v.epoch(), Timestamp(42))); + } + + CollectionType collectionTypeWithNewEpoch = + makeCollectionType(newChunks.back().getVersion()); + _remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch); + _remoteLoaderMock->setChunkRefreshReturnValue(newChunks); + + auto collAndChunksRes = _shardLoader->getChunksSince(kNss, newChunks[0].getVersion()).get(); + ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch()); + ASSERT_EQUALS(collAndChunksRes.creationTime, Timestamp(42)); + ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); + for (const auto& changedChunk : collAndChunksRes.changedChunks) { + ASSERT_EQUALS(changedChunk.getVersion().getTimestamp(), Timestamp(42)); + ASSERT_EQUALS(changedChunk.getVersion().epoch(), collAndChunksRes.epoch); + } + } + + // Simulating that the config server removed timestamps from all chunks + { + vector<ChunkType> newChunks = chunks; + for (auto& chunk : newChunks) { + const ChunkVersion v = chunk.getVersion(); + chunk.setVersion( + ChunkVersion(v.majorVersion(), v.minorVersion(), v.epoch(), boost::none)); + } + + CollectionType collectionTypeWithNewEpoch = + makeCollectionType(newChunks.back().getVersion()); + _remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch); + _remoteLoaderMock->setChunkRefreshReturnValue(newChunks); + + auto collAndChunksRes = _shardLoader->getChunksSince(kNss, newChunks[0].getVersion()).get(); + ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch()); + ASSERT_EQUALS(collAndChunksRes.creationTime, boost::none); + ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); + for (const auto& changedChunk : collAndChunksRes.changedChunks) { + ASSERT_EQUALS(changedChunk.getVersion().getTimestamp(), boost::none); + ASSERT_EQUALS(changedChunk.getVersion().epoch(), collAndChunksRes.epoch); + } + } +} + +TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDbMetadataFormatChanged) { + const std::string dbName("dbName"); + DatabaseVersion version(UUID::gen()); + DatabaseType dbType(dbName, kShardId, true /* sharded */, version); + + _remoteLoaderMock->setDatabaseRefreshReturnValue(dbType); + auto newDbType = _shardLoader->getDatabase(dbName).get(); + ASSERT_EQUALS(dbType.getVersion().getUuid(), newDbType.getVersion().getUuid()); + ASSERT_EQUALS(dbType.getVersion().getTimestamp(), newDbType.getVersion().getTimestamp()); + + dbType.setVersion(DatabaseVersion(UUID::gen(), Timestamp(42))); + _remoteLoaderMock->setDatabaseRefreshReturnValue(dbType); + newDbType = _shardLoader->getDatabase(dbName).get(); + ASSERT_EQUALS(dbType.getVersion().getUuid(), newDbType.getVersion().getUuid()); + ASSERT_EQUALS(dbType.getVersion().getTimestamp(), newDbType.getVersion().getTimestamp()); +} + } // namespace } // namespace mongo |