summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergi Mateo Bellido <sergi.mateo-bellido@mongodb.com>2021-02-04 16:38:07 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-09 09:34:14 +0000
commit29a838f9193ba88c7ba8195fd44eb252a80be5f8 (patch)
tree22ea87eb9722091559408c180257301b5ef06931
parent194975733a8289247e972a2d6a2d0e8ffd25bad3 (diff)
downloadmongo-29a838f9193ba88c7ba8195fd44eb252a80be5f8.tar.gz
SERVER-53104 Ensure all shard-local cache collections of config.collections contain timestamps after upgrade
- Modifying the loader to properly handle metadata format changes in Collections & Databases
-rw-r--r--jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js14
-rw-r--r--jstests/multiVersion/upgrade_downgrade_sharded_cluster.js18
-rw-r--r--src/mongo/db/commands/set_feature_compatibility_version_command.cpp9
-rw-r--r--src/mongo/db/s/shard_metadata_util.cpp80
-rw-r--r--src/mongo/db/s/shard_metadata_util.h21
-rw-r--r--src/mongo/db/s/shard_metadata_util_test.cpp30
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp225
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h49
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp77
9 files changed, 323 insertions, 200 deletions
diff --git a/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js b/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js
index f546aa8bee5..307a8077fb0 100644
--- a/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js
+++ b/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js
@@ -27,6 +27,20 @@ const st = new ShardingTest({
other: {mongosOptions: {binVersion: "latest"}}
});
+const isFeatureFlagEnabled =
+ assert
+ .commandWorked(st.configRS.getPrimary().adminCommand(
+ {getParameter: 1, featureFlagShardingFullDDLSupportTimestampedVersion: 1}))
+ .featureFlagShardingFullDDLSupportTimestampedVersion.value;
+
+if (isFeatureFlagEnabled) {
+ // TODO SERVER-53104: do not skip this test once this ticket is completed
+ jsTest.log(
+ 'Skipping test since it is not compatible with featureFlagShardingFullDDLSupportTimestampedVersion yet');
+ st.stop();
+ return;
+}
+
let shardedColl = st.s.getDB(dbName)[collName];
assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
st.ensurePrimaryShard(dbName, st.shard0.shardName);
diff --git a/jstests/multiVersion/upgrade_downgrade_sharded_cluster.js b/jstests/multiVersion/upgrade_downgrade_sharded_cluster.js
index 5f5c93b5c94..bbe554d9de4 100644
--- a/jstests/multiVersion/upgrade_downgrade_sharded_cluster.js
+++ b/jstests/multiVersion/upgrade_downgrade_sharded_cluster.js
@@ -259,9 +259,8 @@ function runChecksAfterFCVDowngrade() {
{getParameter: 1, featureFlagShardingFullDDLSupportTimestampedVersion: 1}))
.featureFlagShardingFullDDLSupportTimestampedVersion.value;
- testAllowedMigrationsFieldChecksAfterFCVDowngrade();
-
if (isFeatureFlagEnabled) {
+ testAllowedMigrationsFieldChecksAfterFCVDowngrade();
testTimestampFieldChecksAfterFCVDowngrade();
testChunkCollectionUuidFieldChecksAfterFCVDowngrade();
} else {
@@ -295,6 +294,21 @@ for (let oldVersion of [lastLTSFCV, lastContinuousFCV]) {
// Upgrade the entire cluster to the latest version.
jsTest.log('upgrading cluster');
st.upgradeCluster(latestFCV);
+
+ const isFeatureFlagEnabled =
+ assert
+ .commandWorked(st.configRS.getPrimary().adminCommand(
+ {getParameter: 1, featureFlagShardingFullDDLSupportTimestampedVersion: 1}))
+ .featureFlagShardingFullDDLSupportTimestampedVersion.value;
+
+ if (isFeatureFlagEnabled) {
+ // TODO SERVER-53104: do not skip this test once this ticket is completed
+ jsTest.log(
+ 'Skipping test since it is not compatible with featureFlagShardingFullDDLSupportTimestampedVersion yet');
+ st.stop();
+ return;
+ }
+
assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
// Tests after upgrade
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
index 70a3c038e3f..a2c478ef871 100644
--- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
+++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
@@ -56,7 +56,6 @@
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/migration_util.h"
-#include "mongo/db/s/shard_metadata_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/views/view_catalog.h"
@@ -423,14 +422,6 @@ public:
if (failDowngrading.shouldFail())
return false;
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- if (requestedVersion < FeatureCompatibilityParams::Version::kVersion49) {
- // SERVER-52632: Remove once 5.0 becomes the LastLTS
- shardmetadatautil::downgradeShardConfigDatabasesEntriesToPre49(opCtx);
- shardmetadatautil::downgradeShardConfigCollectionEntriesToPre49(opCtx);
- }
- }
-
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
// Downgrade shards before config finishes its downgrade.
uassertStatusOK(
diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp
index dade37ea0e5..37e78540c23 100644
--- a/src/mongo/db/s/shard_metadata_util.cpp
+++ b/src/mongo/db/s/shard_metadata_util.cpp
@@ -401,6 +401,26 @@ Status updateShardChunks(OperationContext* opCtx,
}
}
+void updateTimestampOnShardCollections(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<Timestamp>& timestamp) {
+ write_ops::Update clearFields(NamespaceString::kShardConfigCollectionsNamespace, [&] {
+ write_ops::UpdateOpEntry u;
+ u.setQ(BSON(ShardCollectionType::kNssFieldName << nss.ns()));
+ BSONObj updateOp = (timestamp)
+ ? BSON("$set" << BSON(CollectionType::kTimestampFieldName << *timestamp))
+ : BSON("$unset" << BSON(ShardCollectionType::kPre50CompatibleAllowMigrationsFieldName
+ << "" << CollectionType::kTimestampFieldName << ""));
+ u.setU(write_ops::UpdateModification::parseFromClassicUpdate(updateOp));
+ return std::vector{u};
+ }());
+
+ DBDirectClient client(opCtx);
+ const auto commandResult = client.runCommand(clearFields.serialize({}));
+
+ uassertStatusOK(getStatusFromWriteCommandResponse(commandResult->getCommandReply()));
+}
+
Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, const NamespaceString& nss) {
try {
DBDirectClient client(opCtx);
@@ -487,65 +507,5 @@ Status deleteDatabasesEntry(OperationContext* opCtx, StringData dbName) {
}
}
-void downgradeShardConfigDatabasesEntriesToPre49(OperationContext* opCtx) {
- LOGV2(5258804, "Starting downgrade of config.cache.databases");
- if (feature_flags::gShardingFullDDLSupportTimestampedVersion.isEnabledAndIgnoreFCV()) {
- write_ops::Update clearFields(NamespaceString::kShardConfigDatabasesNamespace, [] {
- write_ops::UpdateOpEntry u;
- u.setQ({});
- u.setU(write_ops::UpdateModification::parseFromClassicUpdate(
- BSON("$unset" << BSON(
- ShardDatabaseType::version() + "." + DatabaseVersion::kTimestampFieldName
- << ""))));
- u.setMulti(true);
- return std::vector{u};
- }());
-
- clearFields.setWriteCommandBase([] {
- write_ops::WriteCommandBase base;
- base.setOrdered(false);
- return base;
- }());
-
- DBDirectClient client(opCtx);
- const auto commandResult = client.runCommand(clearFields.serialize({}));
-
- uassertStatusOK(getStatusFromWriteCommandResponse(commandResult->getCommandReply()));
- LOGV2(5258805, "Successfully downgraded config.cache.databases");
- }
-}
-
-void downgradeShardConfigCollectionEntriesToPre49(OperationContext* opCtx) {
- // Clear the 'allowMigrations' and 'timestamp' fields from config.cache.collections
- LOGV2(5189100, "Starting downgrade of config.cache.collections");
- write_ops::Update clearFields(NamespaceString::kShardConfigCollectionsNamespace, [] {
- BSONObj unsetFields =
- BSON(ShardCollectionType::kPre50CompatibleAllowMigrationsFieldName << "");
- if (feature_flags::gShardingFullDDLSupportTimestampedVersion.isEnabledAndIgnoreFCV()) {
- unsetFields = unsetFields.addFields(BSON(CollectionType::kTimestampFieldName << ""));
- }
-
- write_ops::UpdateOpEntry u;
- u.setQ({});
- u.setU(
- write_ops::UpdateModification::parseFromClassicUpdate(BSON("$unset" << unsetFields)));
- u.setMulti(true);
- return std::vector{u};
- }());
-
- clearFields.setWriteCommandBase([] {
- write_ops::WriteCommandBase base;
- base.setOrdered(false);
- return base;
- }());
-
- DBDirectClient client(opCtx);
- const auto commandResult = client.runCommand(clearFields.serialize({}));
-
- uassertStatusOK(getStatusFromWriteCommandResponse(commandResult->getCommandReply()));
-
- LOGV2(5189101, "Successfully downgraded config.cache.collections");
-}
-
} // namespace shardmetadatautil
} // namespace mongo
diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h
index 45bee780dd0..8d7315e7e6e 100644
--- a/src/mongo/db/s/shard_metadata_util.h
+++ b/src/mongo/db/s/shard_metadata_util.h
@@ -202,6 +202,13 @@ Status updateShardChunks(OperationContext* opCtx,
const OID& currEpoch);
/**
+ * Adds/removes the timestamp of the 'nss' entry in config.cache.collections
+ */
+void updateTimestampOnShardCollections(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<Timestamp>& timestamp);
+
+/**
* Deletes locally persisted chunk metadata associated with 'nss': drops the chunks collection
* and removes the collections collection entry.
*
@@ -222,19 +229,5 @@ void dropChunks(OperationContext* opCtx, const NamespaceString& nss);
*/
Status deleteDatabasesEntry(OperationContext* opCtx, StringData dbName);
-/**
- * Downgrades the config.cache.databases entries to prior 4.9 version. More specifically, it removes
- * the 'version.timestamp' field from all the documents in config.cache.databases.
- *
- */
-void downgradeShardConfigDatabasesEntriesToPre49(OperationContext* opCtx);
-
-/**
- * Downgrades the config.cache.collections entries to prior 4.9 version. More specifically, it
- * removes the allowMigrations and timestamp fields from all the documents of
- * config.cache.collections
- */
-void downgradeShardConfigCollectionEntriesToPre49(OperationContext* opCtx);
-
} // namespace shardmetadatautil
} // namespace mongo
diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp
index d20ba3be849..4890a09a5d9 100644
--- a/src/mongo/db/s/shard_metadata_util_test.cpp
+++ b/src/mongo/db/s/shard_metadata_util_test.cpp
@@ -326,35 +326,5 @@ TEST_F(ShardMetadataUtilTest, DropChunksAndDeleteCollectionsEntry) {
checkCollectionIsEmpty(NamespaceString::kShardConfigCollectionsNamespace);
}
-TEST_F(ShardMetadataUtilTest, DowngradeShardConfigCollectionEntriesTo44) {
- setUpShardChunkMetadata();
-
- const auto checkShardCollectionAllowMigrationsFlag = [&](bool expectedValue) {
- ShardCollectionType readShardCollectionType =
- assertGet(readShardCollectionsEntry(operationContext(), kNss));
-
- ASSERT_EQUALS(readShardCollectionType.getAllowMigrations(), expectedValue);
- };
-
- // allowMigrations is an optional field. If it is not present -> allowMigrations = true
- checkShardCollectionAllowMigrationsFlag(/* expectedValue */ true);
-
- ASSERT_OK(updateShardCollectionsEntry(
- operationContext(),
- BSON(ShardCollectionType::kNssFieldName << kNss.ns()),
- BSON(ShardCollectionType::kPre50CompatibleAllowMigrationsFieldName << false),
- BSONObj(),
- /* upsert */ false));
-
- // allowMigrations was explicitly defined to false by the previous statement
- checkShardCollectionAllowMigrationsFlag(/* expectedValue */ false);
-
- downgradeShardConfigCollectionEntriesToPre49(operationContext());
-
- // The downgrade process to prior 4.9 removes the allowMigration flag from all collections.
- // Thus, this flag will not be present which implies that its value is true
- checkShardCollectionAllowMigrationsFlag(/* expectedValue */ true);
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 20132ca3f8e..aa82c7d916c 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -227,8 +227,12 @@ CollectionAndChangedChunks getPersistedMetadataSinceVersion(OperationContext* op
uassertStatusOK(readShardCollectionsEntry(opCtx, nss));
// If the persisted epoch doesn't match what the CatalogCache requested, read everything.
+ // If the epochs are the same we can safely take the timestamp from the shard coll entry.
ChunkVersion startingVersion = (shardCollectionEntry.getEpoch() == version.epoch())
- ? version
+ ? ChunkVersion(version.majorVersion(),
+ version.minorVersion(),
+ version.epoch(),
+ shardCollectionEntry.getTimestamp())
: ChunkVersion(0, 0, shardCollectionEntry.getEpoch(), shardCollectionEntry.getTimestamp());
QueryAndSort diff = createShardChunkDiffQuery(startingVersion);
@@ -364,6 +368,22 @@ ChunkVersion getLocalVersion(OperationContext* opCtx, const NamespaceString& nss
return uassertStatusOK(std::move(swRefreshState)).lastRefreshedCollectionVersion;
}
+void patchUpChangedChunksIfNeeded(bool mustPatchUpMetadataResults,
+ CollectionAndChangedChunks& collAndChunks) {
+ if (!mustPatchUpMetadataResults)
+ return;
+
+ const boost::optional<Timestamp> newTimestamp = collAndChunks.creationTime;
+ std::for_each(
+ collAndChunks.changedChunks.begin(),
+ collAndChunks.changedChunks.end(),
+ [&newTimestamp](ChunkType& chunk) {
+ const ChunkVersion version = chunk.getVersion();
+ chunk.setVersion(ChunkVersion(
+ version.majorVersion(), version.minorVersion(), version.epoch(), newTimestamp));
+ });
+}
+
} // namespace
ShardServerCatalogCacheLoader::ShardServerCatalogCacheLoader(
@@ -661,7 +681,7 @@ ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
_ensureMajorityPrimaryAndScheduleCollAndChunksTask(
opCtx,
nss,
- collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ CollAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
LOGV2_FOR_CATALOG_REFRESH(
24107,
@@ -690,12 +710,36 @@ ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
<< "'."};
}
+
+ if (maxLoaderVersion.isSet() &&
+ (maxLoaderVersion.getTimestamp().is_initialized() !=
+ collAndChunks.creationTime.is_initialized())) {
+ // This task will update the metadata format of the collection and all its chunks.
+ // It doesn't apply the changes of the ChangedChunks, we will do that in the next task
+ _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+ opCtx,
+ nss,
+ CollAndChunkTask{swCollectionAndChangedChunks,
+ maxLoaderVersion,
+ termScheduled,
+ true /* metadataFormatChanged */});
+
+ LOGV2_FOR_CATALOG_REFRESH(5310400,
+ 1,
+ "Cache loader update metadata format for collection {namespace}"
+ "{oldTimestamp} and {newTimestamp}",
+ "Cache loader update metadata format for collection",
+ "namespace"_attr = nss,
+ "oldTimestamp"_attr = maxLoaderVersion.getTimestamp(),
+ "newTimestamp"_attr = collAndChunks.creationTime);
+ }
+
if ((collAndChunks.epoch != maxLoaderVersion.epoch()) ||
maxLoaderVersion.isOlderThan(collAndChunks.changedChunks.back().getVersion())) {
_ensureMajorityPrimaryAndScheduleCollAndChunksTask(
opCtx,
nss,
- collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ CollAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
}
LOGV2_FOR_CATALOG_REFRESH(
@@ -788,22 +832,24 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader
// Get the enqueued metadata first. Otherwise we could miss data between reading persisted and
// enqueued, if an enqueued task finished after the persisted read but before the enqueued read.
- auto enqueuedRes = _getEnqueuedMetadata(nss, catalogCacheSinceVersion, expectedTerm);
- bool tasksAreEnqueued = std::move(enqueuedRes.first);
- CollectionAndChangedChunks enqueued = std::move(enqueuedRes.second);
+ auto [tasksAreEnqueued, enqueuedMetadata] =
+ _getEnqueuedMetadata(nss, catalogCacheSinceVersion, expectedTerm);
+ CollectionAndChangedChunks& enqueued = enqueuedMetadata.collAndChangedChunks;
auto swPersisted =
getIncompletePersistedMetadataSinceVersion(opCtx, nss, catalogCacheSinceVersion);
CollectionAndChangedChunks persisted;
if (swPersisted == ErrorCodes::NamespaceNotFound) {
- // No persisted metadata found, create an empty object.
- persisted = CollectionAndChangedChunks();
+ // No persisted metadata found
} else if (!swPersisted.isOK()) {
return swPersisted;
} else {
persisted = std::move(swPersisted.getValue());
}
+ bool lastTaskIsADrop = tasksAreEnqueued && enqueued.changedChunks.empty() &&
+ !enqueuedMetadata.mustPatchUpMetadataResults;
+
LOGV2_FOR_CATALOG_REFRESH(
24111,
1,
@@ -812,7 +858,10 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader
"Cache loader state since the latest cached version",
"enqueuedTasksDesc"_attr =
(enqueued.changedChunks.empty()
- ? (tasksAreEnqueued ? "a drop enqueued" : "no enqueued metadata")
+ ? (tasksAreEnqueued
+ ? (lastTaskIsADrop ? "a drop is enqueued"
+ : "an update of the metadata format is enqueued")
+ : "no enqueued metadata")
: ("enqueued metadata from " +
enqueued.changedChunks.front().getVersion().toString() + " to " +
enqueued.changedChunks.back().getVersion().toString())),
@@ -827,17 +876,18 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader
if (!tasksAreEnqueued) {
// There are no tasks in the queue. Return the persisted metadata.
return persisted;
- } else if (persisted.changedChunks.empty() || enqueued.changedChunks.empty() ||
+ } else if (persisted.changedChunks.empty() || lastTaskIsADrop ||
enqueued.epoch != persisted.epoch) {
- // There is a task queue and:
- // - nothing is persisted.
- // - nothing was returned from enqueued, which means the last task enqueued is a drop task.
- // - the epoch changed in the enqueued metadata, which means there's a drop operation
- // enqueued somewhere.
+ // There is a task in the queue and:
+ // - nothing is persisted, OR
+ // - the last task in the queue was a drop, OR
+ // - the epoch changed in the enqueued metadata.
// Whichever the cause, the persisted metadata is out-dated/non-existent. Return enqueued
// results.
+ patchUpChangedChunksIfNeeded(enqueuedMetadata.mustPatchUpMetadataResults, enqueued);
return enqueued;
} else {
+
// There can be overlap between persisted and enqueued metadata because enqueued work can
// be applied while persisted was read. We must remove this overlap.
@@ -857,6 +907,13 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader
persisted.changedChunks.insert(persisted.changedChunks.end(),
enqueued.changedChunks.begin(),
enqueued.changedChunks.end());
+
+ // We may need to patch up the changed chunks because there was a metadata format change
+ patchUpChangedChunksIfNeeded(enqueuedMetadata.mustPatchUpMetadataResults, persisted);
+
+
+ // The collection info in enqueued metadata may be more recent than the persited metadata
+ persisted.creationTime = enqueued.creationTime;
persisted.reshardingFields = std::move(enqueued.reshardingFields);
persisted.allowMigrations = enqueued.allowMigrations;
@@ -864,29 +921,30 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader
}
}
-std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getEnqueuedMetadata(
- const NamespaceString& nss,
- const ChunkVersion& catalogCacheSinceVersion,
- const long long term) {
+std::pair<bool, ShardServerCatalogCacheLoader::EnqueuedMetadataResults>
+ShardServerCatalogCacheLoader::_getEnqueuedMetadata(const NamespaceString& nss,
+ const ChunkVersion& catalogCacheSinceVersion,
+ const long long term) {
stdx::unique_lock<Latch> lock(_mutex);
auto taskListIt = _collAndChunkTaskLists.find(nss);
if (taskListIt == _collAndChunkTaskLists.end()) {
- return std::make_pair(false, CollectionAndChangedChunks());
+ return std::make_pair(false, EnqueuedMetadataResults());
} else if (!taskListIt->second.hasTasksFromThisTerm(term)) {
// If task list does not have a term that matches, there's no valid task data to collect.
- return std::make_pair(false, CollectionAndChangedChunks());
+ return std::make_pair(false, EnqueuedMetadataResults());
}
// Only return task data of tasks scheduled in the same term as the given 'term': older term
// task data is no longer valid.
- CollectionAndChangedChunks collAndChunks = taskListIt->second.getEnqueuedMetadataForTerm(term);
+ EnqueuedMetadataResults enqueuedMetadata = taskListIt->second.getEnqueuedMetadataForTerm(term);
+ CollectionAndChangedChunks& collAndChunks = enqueuedMetadata.collAndChangedChunks;
// Return all the results if 'catalogCacheSinceVersion's epoch does not match. Otherwise, trim
// the results to be GTE to 'catalogCacheSinceVersion'.
if (collAndChunks.epoch != catalogCacheSinceVersion.epoch()) {
- return std::make_pair(true, collAndChunks);
+ return std::make_pair(true, std::move(enqueuedMetadata));
}
auto changedChunksIt = collAndChunks.changedChunks.begin();
@@ -896,11 +954,11 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE
}
collAndChunks.changedChunks.erase(collAndChunks.changedChunks.begin(), changedChunksIt);
- return std::make_pair(true, collAndChunks);
+ return std::make_pair(true, std::move(enqueuedMetadata));
}
void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChunksTask(
- OperationContext* opCtx, const NamespaceString& nss, collAndChunkTask task) {
+ OperationContext* opCtx, const NamespaceString& nss, CollAndChunkTask task) {
{
stdx::lock_guard<Latch> lock(_mutex);
@@ -1107,8 +1165,9 @@ void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata(
OperationContext* opCtx, const NamespaceString& nss) {
stdx::unique_lock<Latch> lock(_mutex);
- const collAndChunkTask& task = _collAndChunkTaskLists[nss].front();
- invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty());
+ const CollAndChunkTask& task = _collAndChunkTaskLists[nss].front();
+ invariant(task.dropped || task.updateMetadataFormat ||
+ !task.collectionAndChangedChunks->changedChunks.empty());
// If this task is from an old term and no longer valid, do not execute and return true so that
// the task gets removed from the task list
@@ -1128,6 +1187,12 @@ void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata(
return;
}
+ if (task.updateMetadataFormat) {
+ updateTimestampOnShardCollections(
+ opCtx, nss, task.collectionAndChangedChunks->creationTime);
+ return;
+ }
+
uassertStatusOKWithContext(
persistCollectionAndChangedChunks(
opCtx, nss, *task.collectionAndChangedChunks, task.minQueryVersion),
@@ -1225,17 +1290,35 @@ ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVer
}
}
-ShardServerCatalogCacheLoader::collAndChunkTask::collAndChunkTask(
+ShardServerCatalogCacheLoader::CollAndChunkTask::CollAndChunkTask(
StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
ChunkVersion minimumQueryVersion,
- long long currentTerm)
+ long long currentTerm,
+ bool metadataFormatChanged)
: taskNum(taskIdGenerator.fetchAndAdd(1)),
- minQueryVersion(minimumQueryVersion),
+ minQueryVersion(std::move(minimumQueryVersion)),
+ updateMetadataFormat(metadataFormatChanged),
termCreated(currentTerm) {
if (statusWithCollectionAndChangedChunks.isOK()) {
- collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue();
- invariant(!collectionAndChangedChunks->changedChunks.empty());
- maxQueryVersion = collectionAndChangedChunks->changedChunks.back().getVersion();
+ if (updateMetadataFormat) {
+ // An update metadata format doesn't handle the ChangedChunks: if
+ // there are any and they are newer than what we have locally, they
+ // will be handled by another task. Despite of that, we still need
+ // a valid CollectionAndChangedChunks object to get the information
+ // of the collection.
+ collectionAndChangedChunks = std::move(statusWithCollectionAndChangedChunks.getValue());
+ collectionAndChangedChunks->changedChunks.clear();
+ // Propagating the Timestamp to the maxQueryVersion, so 'getHighestVersionEnqueued'
+ // returns a version with the new format.
+ maxQueryVersion = ChunkVersion(minQueryVersion.majorVersion(),
+ minQueryVersion.minorVersion(),
+ minQueryVersion.epoch(),
+ collectionAndChangedChunks->creationTime);
+ } else {
+ collectionAndChangedChunks = std::move(statusWithCollectionAndChangedChunks.getValue());
+ invariant(!collectionAndChangedChunks->changedChunks.empty());
+ maxQueryVersion = collectionAndChangedChunks->changedChunks.back().getVersion();
+ }
} else {
invariant(statusWithCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound);
dropped = true;
@@ -1259,7 +1342,7 @@ ShardServerCatalogCacheLoader::CollAndChunkTaskList::CollAndChunkTaskList()
ShardServerCatalogCacheLoader::DbTaskList::DbTaskList()
: _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {}
-void ShardServerCatalogCacheLoader::CollAndChunkTaskList::addTask(collAndChunkTask task) {
+void ShardServerCatalogCacheLoader::CollAndChunkTaskList::addTask(CollAndChunkTask task) {
if (_tasks.empty()) {
_tasks.emplace_back(std::move(task));
return;
@@ -1343,21 +1426,19 @@ bool ShardServerCatalogCacheLoader::CollAndChunkTaskList::hasTasksFromThisTerm(
return _tasks.back().termCreated == term;
}
-bool ShardServerCatalogCacheLoader::DbTaskList::hasTasksFromThisTerm(long long term) const {
- invariant(!_tasks.empty());
- return _tasks.back().termCreated == term;
-}
-
ChunkVersion ShardServerCatalogCacheLoader::CollAndChunkTaskList::getHighestVersionEnqueued()
const {
invariant(!_tasks.empty());
return _tasks.back().maxQueryVersion;
}
-CollectionAndChangedChunks
+ShardServerCatalogCacheLoader::EnqueuedMetadataResults
ShardServerCatalogCacheLoader::CollAndChunkTaskList::getEnqueuedMetadataForTerm(
const long long term) const {
- CollectionAndChangedChunks collAndChunks;
+ EnqueuedMetadataResults enqueuedMetadata;
+
+ CollectionAndChangedChunks& collAndChunks = enqueuedMetadata.collAndChangedChunks;
+ bool& mustPatchUpMetadataResults = enqueuedMetadata.mustPatchUpMetadataResults;
for (const auto& task : _tasks) {
if (task.termCreated != term) {
// Task data is no longer valid. Go on to the next task in the list.
@@ -1365,37 +1446,49 @@ ShardServerCatalogCacheLoader::CollAndChunkTaskList::getEnqueuedMetadataForTerm(
}
if (task.dropped) {
- // A drop task should reset the metadata.
+ // The current task is a drop -> the aggregated results aren't interesting so we
+ // overwrite the CollAndChangedChunks and unset the flag
collAndChunks = CollectionAndChangedChunks();
+ mustPatchUpMetadataResults = false;
+ } else if (task.collectionAndChangedChunks->epoch != collAndChunks.epoch) {
+ // The current task has a new epoch (refine shard key or resharding op) -> the
+ // aggregated results aren't interesting so we overwrite them with the current
+ // CollAndChangedChunks and unset the flag
+ collAndChunks = *task.collectionAndChangedChunks;
+ mustPatchUpMetadataResults = false;
+ } else if (task.updateMetadataFormat) {
+ // The current task is an update task -> we only update the Timestamp of the aggregated
+ // results and set the flag
+ collAndChunks.creationTime = task.collectionAndChangedChunks->creationTime;
+ mustPatchUpMetadataResults = true;
} else {
- if (task.collectionAndChangedChunks->epoch != collAndChunks.epoch) {
- // An epoch change should reset the metadata and start from the new.
- collAndChunks = *task.collectionAndChangedChunks;
- } else {
- // Epochs match, so the new results should be appended.
-
- // Make sure we do not append a duplicate chunk. The diff query is GTE, so there can
- // be duplicates of the same exact versioned chunk across tasks. This is no problem
- // for our diff application algorithms, but it can return unpredictable numbers of
- // chunks for testing purposes. Eliminate unpredicatable duplicates for testing
- // stability.
- auto taskCollectionAndChangedChunksIt =
- task.collectionAndChangedChunks->changedChunks.begin();
- if (collAndChunks.changedChunks.back().getVersion() ==
- task.collectionAndChangedChunks->changedChunks.front().getVersion()) {
- ++taskCollectionAndChangedChunksIt;
- }
+ // The current task is not a drop neither an update and the epochs match -> we add its
+ // results to the aggregated results
+
+ // Make sure we do not append a duplicate chunk. The diff query is GTE, so there can
+ // be duplicates of the same exact versioned chunk across tasks. This is no problem
+ // for our diff application algorithms, but it can return unpredictable numbers of
+ // chunks for testing purposes. Eliminate unpredicatable duplicates for testing
+ // stability.
+ auto taskCollectionAndChangedChunksIt =
+ task.collectionAndChangedChunks->changedChunks.begin();
+ if (!collAndChunks.changedChunks.empty() &&
+ collAndChunks.changedChunks.back().getVersion() ==
+ taskCollectionAndChangedChunksIt->getVersion()) {
+ ++taskCollectionAndChangedChunksIt;
+ }
- collAndChunks.changedChunks.insert(
- collAndChunks.changedChunks.end(),
- taskCollectionAndChangedChunksIt,
- task.collectionAndChangedChunks->changedChunks.end());
+ collAndChunks.changedChunks.insert(
+ collAndChunks.changedChunks.end(),
+ taskCollectionAndChangedChunksIt,
+ task.collectionAndChangedChunks->changedChunks.end());
- collAndChunks.reshardingFields = task.collectionAndChangedChunks->reshardingFields;
- }
+ // Keep the most recent version of these fields
+ collAndChunks.allowMigrations = task.collectionAndChangedChunks->allowMigrations;
+ collAndChunks.reshardingFields = task.collectionAndChangedChunks->reshardingFields;
}
}
- return collAndChunks;
+ return enqueuedMetadata;
}
} // namespace mongo
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index e507063920d..b729dd2ae1c 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -96,10 +96,10 @@ private:
* apply a set up updated chunks to the shard persisted metadata store or to drop the persisted
* metadata for a specific collection.
*/
- struct collAndChunkTask {
- collAndChunkTask(const collAndChunkTask&) = delete;
- collAndChunkTask& operator=(const collAndChunkTask&) = delete;
- collAndChunkTask(collAndChunkTask&&) = default;
+ struct CollAndChunkTask {
+ CollAndChunkTask(const CollAndChunkTask&) = delete;
+ CollAndChunkTask& operator=(const CollAndChunkTask&) = delete;
+ CollAndChunkTask(CollAndChunkTask&&) = default;
/**
* Initializes a task for either dropping or updating the persisted metadata for the
@@ -109,15 +109,20 @@ private:
* Note: statusWithCollectionAndChangedChunks must always be NamespaceNotFound or
* OK, otherwise the constructor will invariant because there is no task to complete.
*
+ * if 'metadataFormatChanged' is true, this task updates the persistent
+ * metadata format of the collection and its chunks. This specific kind
+ * of task doesn't have changed chunks.
+ *
* 'collectionAndChangedChunks' is only initialized if 'dropped' is false.
* 'minimumQueryVersion' sets 'minQueryVersion'.
* 'maxQueryVersion' is either set to the highest chunk version in
* 'collectionAndChangedChunks' or ChunkVersion::UNSHARDED().
*/
- collAndChunkTask(
+ CollAndChunkTask(
StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
ChunkVersion minimumQueryVersion,
- long long currentTerm);
+ long long currentTerm,
+ bool metadataFormatChanged = false);
// Always-incrementing task number to uniquely identify different tasks
uint64_t taskNum;
@@ -139,10 +144,24 @@ private:
// Indicates whether the collection metadata must be cleared.
bool dropped{false};
+ // Indicates whether the collection metadata and all its chunks must be updated due to a
+ // metadata format change.
+ bool updateMetadataFormat{false};
+
// The term in which the loader scheduled this task.
uint32_t termCreated;
};
+ /* This class represents the results of a _getEnqueuedMetadata call. It contains information
+ * about:
+ * - Whether we must patch up the metadata results that are sent back to the CatalogCache.
+ * - The Collection and the changed chunks.
+ */
+ struct EnqueuedMetadataResults {
+ bool mustPatchUpMetadataResults{false};
+ CollectionAndChangedChunks collAndChangedChunks;
+ };
+
/**
* A list (work queue) of updates to apply to the shard persisted metadata store for a specific
* collection. Enforces that tasks that are added to the list are either consistent:
@@ -164,7 +183,7 @@ private:
* don't waste time applying changes we will just delete. If the one remaining task in the
* list is already a drop task, the new one isn't added because it is redundant.
*/
- void addTask(collAndChunkTask task);
+ void addTask(CollAndChunkTask task);
auto& front() {
invariant(!_tasks.empty());
@@ -218,10 +237,11 @@ private:
* Iterates over the task list to retrieve the enqueued metadata. Only retrieves collects
* data from tasks that have terms matching the specified 'term'.
*/
- CollectionAndChangedChunks getEnqueuedMetadataForTerm(const long long term) const;
+ EnqueuedMetadataResults getEnqueuedMetadataForTerm(const long long term) const;
+
private:
- std::list<collAndChunkTask> _tasks{};
+ std::list<CollAndChunkTask> _tasks{};
// Condition variable which will be signaled whenever the active task from the tasks list is
// completed. Must be used in conjunction with the loader's mutex.
@@ -312,13 +332,6 @@ private:
*/
void waitForActiveTaskCompletion(stdx::unique_lock<Latch>& lg);
- /**
- * Checks whether 'term' matches the term of the latest task in the task list. This is
- * useful to check whether the task list has outdated data that's no longer valid to use in
- * the current/new term specified by 'term'.
- */
- bool hasTasksFromThisTerm(long long term) const;
-
private:
std::list<DBTask> _tasks{};
@@ -406,7 +419,7 @@ private:
*
* Only run on the shard primary.
*/
- std::pair<bool, CollectionAndChangedChunks> _getEnqueuedMetadata(
+ std::pair<bool, EnqueuedMetadataResults> _getEnqueuedMetadata(
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
const long long term);
@@ -423,7 +436,7 @@ private:
*/
void _ensureMajorityPrimaryAndScheduleCollAndChunksTask(OperationContext* opCtx,
const NamespaceString& nss,
- collAndChunkTask task);
+ CollAndChunkTask task);
void _ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx,
StringData dbName,
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
index fbe67ed204f..026348146dd 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
@@ -29,10 +29,13 @@
#include "mongo/platform/basic.h"
+#include <boost/optional/optional_io.hpp>
+
#include "mongo/db/s/shard_server_catalog_cache_loader.h"
#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog_cache_loader_mock.h"
namespace mongo {
@@ -186,7 +189,8 @@ ShardServerCatalogCacheLoaderTest::makeCombinedOriginalFiveChunksAndThreeNewChun
CollectionType ShardServerCatalogCacheLoaderTest::makeCollectionType(
const ChunkVersion& collVersion) {
- CollectionType coll(kNss, collVersion.epoch(), Date_t::now(), UUID::gen());
+ CollectionType coll(
+ kNss, collVersion.epoch(), collVersion.getTimestamp(), Date_t::now(), UUID::gen());
coll.setKeyPattern(kKeyPattern);
coll.setUnique(false);
return coll;
@@ -431,5 +435,76 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindMixedChun
}
}
+TEST_F(ShardServerCatalogCacheLoaderTest,
+ PrimaryLoadFromShardedAndFindCollAndChunksMetadataFormatChanged) {
+ // First set up the shard chunk loader as sharded.
+ vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks();
+
+ // Simulating that the config server added timestamps to all chunks
+ {
+ vector<ChunkType> newChunks = chunks;
+ for (auto& chunk : newChunks) {
+ const ChunkVersion v = chunk.getVersion();
+ chunk.setVersion(
+ ChunkVersion(v.majorVersion(), v.minorVersion(), v.epoch(), Timestamp(42)));
+ }
+
+ CollectionType collectionTypeWithNewEpoch =
+ makeCollectionType(newChunks.back().getVersion());
+ _remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch);
+ _remoteLoaderMock->setChunkRefreshReturnValue(newChunks);
+
+ auto collAndChunksRes = _shardLoader->getChunksSince(kNss, newChunks[0].getVersion()).get();
+ ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch());
+ ASSERT_EQUALS(collAndChunksRes.creationTime, Timestamp(42));
+ ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
+ for (const auto& changedChunk : collAndChunksRes.changedChunks) {
+ ASSERT_EQUALS(changedChunk.getVersion().getTimestamp(), Timestamp(42));
+ ASSERT_EQUALS(changedChunk.getVersion().epoch(), collAndChunksRes.epoch);
+ }
+ }
+
+ // Simulating that the config server removed timestamps from all chunks
+ {
+ vector<ChunkType> newChunks = chunks;
+ for (auto& chunk : newChunks) {
+ const ChunkVersion v = chunk.getVersion();
+ chunk.setVersion(
+ ChunkVersion(v.majorVersion(), v.minorVersion(), v.epoch(), boost::none));
+ }
+
+ CollectionType collectionTypeWithNewEpoch =
+ makeCollectionType(newChunks.back().getVersion());
+ _remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch);
+ _remoteLoaderMock->setChunkRefreshReturnValue(newChunks);
+
+ auto collAndChunksRes = _shardLoader->getChunksSince(kNss, newChunks[0].getVersion()).get();
+ ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch());
+ ASSERT_EQUALS(collAndChunksRes.creationTime, boost::none);
+ ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
+ for (const auto& changedChunk : collAndChunksRes.changedChunks) {
+ ASSERT_EQUALS(changedChunk.getVersion().getTimestamp(), boost::none);
+ ASSERT_EQUALS(changedChunk.getVersion().epoch(), collAndChunksRes.epoch);
+ }
+ }
+}
+
+TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDbMetadataFormatChanged) {
+ const std::string dbName("dbName");
+ DatabaseVersion version(UUID::gen());
+ DatabaseType dbType(dbName, kShardId, true /* sharded */, version);
+
+ _remoteLoaderMock->setDatabaseRefreshReturnValue(dbType);
+ auto newDbType = _shardLoader->getDatabase(dbName).get();
+ ASSERT_EQUALS(dbType.getVersion().getUuid(), newDbType.getVersion().getUuid());
+ ASSERT_EQUALS(dbType.getVersion().getTimestamp(), newDbType.getVersion().getTimestamp());
+
+ dbType.setVersion(DatabaseVersion(UUID::gen(), Timestamp(42)));
+ _remoteLoaderMock->setDatabaseRefreshReturnValue(dbType);
+ newDbType = _shardLoader->getDatabase(dbName).get();
+ ASSERT_EQUALS(dbType.getVersion().getUuid(), newDbType.getVersion().getUuid());
+ ASSERT_EQUALS(dbType.getVersion().getTimestamp(), newDbType.getVersion().getTimestamp());
+}
+
} // namespace
} // namespace mongo