From 5e0dffb6d2723470e1f1bd93de6565c7a4e0353a Mon Sep 17 00:00:00 2001 From: Arun Banala Date: Mon, 20 Sep 2021 17:03:29 +0100 Subject: SERVER-60144 Handle stale routing info on mongos for sharded time-series collections --- jstests/sharding/timeseries_multiple_mongos.js | 242 +++++++++++++++++++++ src/mongo/db/catalog/coll_mod.cpp | 7 + src/mongo/db/coll_mod.idl | 6 + src/mongo/db/commands/create_indexes.cpp | 5 +- src/mongo/db/commands/dbcommands.cpp | 13 +- src/mongo/db/commands/drop_indexes.cpp | 5 +- src/mongo/db/commands/list_indexes.cpp | 9 +- src/mongo/db/commands/write_commands.cpp | 11 +- src/mongo/db/create_indexes.idl | 6 + src/mongo/db/drop_indexes.idl | 6 + src/mongo/db/list_indexes.idl | 6 + src/mongo/db/s/create_collection_coordinator.cpp | 2 +- src/mongo/db/service_entry_point_common.cpp | 13 +- src/mongo/db/timeseries/catalog_helper.cpp | 5 +- src/mongo/db/timeseries/catalog_helper.h | 3 +- .../timeseries_commands_conversion_helper.cpp | 18 +- .../timeseries_commands_conversion_helper.h | 3 +- src/mongo/s/chunk_manager_targeter.cpp | 74 +++++-- src/mongo/s/chunk_manager_targeter.h | 5 +- src/mongo/s/commands/cluster_coll_stats_cmd.cpp | 3 +- .../s/commands/cluster_collection_mod_cmd.cpp | 11 +- .../s/commands/cluster_create_indexes_cmd.cpp | 10 +- src/mongo/s/commands/cluster_drop_indexes_cmd.cpp | 11 +- src/mongo/s/commands/cluster_list_indexes_cmd.cpp | 17 +- 24 files changed, 427 insertions(+), 64 deletions(-) create mode 100644 jstests/sharding/timeseries_multiple_mongos.js diff --git a/jstests/sharding/timeseries_multiple_mongos.js b/jstests/sharding/timeseries_multiple_mongos.js new file mode 100644 index 00000000000..9450b692d71 --- /dev/null +++ b/jstests/sharding/timeseries_multiple_mongos.js @@ -0,0 +1,242 @@ +/** + * Test various commands on time-series collection in the presence of multiple mongos and collection + * changing from unsharded to sharded and vice versa. + * + * @tags: [ + * requires_fcv_51, + * requires_find_command + * ] + */ + +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest' helpers. + +Random.setRandomSeed(); + +const dbName = 'testDB'; +const collName = 'testColl'; +const bucketsCollName = 'system.buckets.' + collName; +const timeField = 'time'; +const metaField = 'hostid'; + +// Connections. +const st = new ShardingTest({mongos: 2, shards: 2, rs: {nodes: 2}}); +const mongos0 = st.s0.getDB(dbName); +const mongos1 = st.s1.getDB(dbName); +const shard0DB = st.shard0.getDB(dbName); +const shard1DB = st.shard1.getDB(dbName); + +if (!TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { + jsTestLog("Skipping test because the sharded time-series collection feature flag is disabled"); + st.stop(); + return; +} + +// Databases and collections. +assert.commandWorked(mongos0.adminCommand({enableSharding: dbName})); + +// Helpers. +let currentId = 0; +function generateId() { + return currentId++; +} + +function generateBatch(size) { + return TimeseriesTest.generateHosts(size).map((host, index) => Object.assign(host, { + _id: generateId(), + [metaField]: index, + [timeField]: ISODate(`20${index}0-01-01`), + })); +} + +/** + * The test runs drop, create and shardCollection commands using mongos0, then validates the given + * command by running against mongos1 with stale config. + */ +function runTest({shardKey, cmdObj, numProfilerEntries}) { + // Insert some dummy data using 'mongos1' as the router, so that the cache is initialized on the + // mongos while the collection is unsharded. + assert.commandWorked(mongos1.getCollection(collName).insert({[timeField]: ISODate()})); + + // Drop and shard the collection with 'mongos0' as the router. + assert(mongos0.getCollection(collName).drop()); + assert.commandWorked(mongos0.createCollection( + collName, {timeseries: {timeField: timeField, metaField: metaField}})); + assert.commandWorked(st.s0.adminCommand({ + shardCollection: `${dbName}.${collName}`, + key: shardKey, + })); + + assert.commandWorked( + mongos0.adminCommand({split: `${dbName}.system.buckets.${collName}`, middle: {meta: 1}})); + + // Move one of the chunks into the second shard. + const primaryShard = st.getPrimaryShard(dbName); + const otherShard = st.getOther(primaryShard); + assert.commandWorked(mongos0.adminCommand({ + movechunk: `${dbName}.system.buckets.${collName}`, + find: {meta: 1}, + to: otherShard.name, + _waitForDelete: true + })); + + // Validate the command by running against 'mongos1' as the router. + function validateCommand(collName, numEntries, unVersioned) { + // Restart profiler. + for (let shardDB of [shard0DB, shard1DB]) { + shardDB.setProfilingLevel(0); + shardDB.system.profile.drop(); + shardDB.setProfilingLevel(2); + } + + const res = mongos1.runCommand(cmdObj); + if (numEntries > 0) { + assert.commandWorked(res); + } else { + assert.commandFailed(res); + } + + const queryField = `command.${Object.keys(cmdObj)[0]}`; + let filter = {[queryField]: collName, "command.shardVersion.0": {$ne: Timestamp(0, 0)}}; + if (unVersioned) { + filter["command.shardVersion.0"] = Timestamp(0, 0); + } + + const shard0Entries = shard0DB.system.profile.find(filter).toArray(); + const shard1Entries = shard1DB.system.profile.find(filter).toArray(); + assert.eq(shard0Entries.length + shard1Entries.length, + numEntries, + {shard0Entries: shard0Entries, shard1Entries: shard1Entries}); + } + validateCommand(bucketsCollName, numProfilerEntries.sharded); + + // Insert dummy data so that the 'mongos1' sees the collection as sharded. + assert.commandWorked(mongos1.getCollection(collName).insert({[timeField]: ISODate()})); + + // Drop and recreate an unsharded collection with 'mongos0' as the router. + assert(mongos0.getCollection(collName).drop()); + assert.commandWorked(mongos0.createCollection(collName, {timeseries: {timeField: timeField}})); + + // When unsharded, the command should be run against the user requested namespace. + validateCommand(cmdObj[Object.keys(cmdObj)[0]] /* coll name specified in the command */, + numProfilerEntries.unsharded, + true); +} + +/** + * Commands on the view namespace. + */ +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: { + createIndexes: collName, + indexes: [{key: {[timeField]: 1}, name: "index_on_time"}], + }, + numProfilerEntries: {sharded: 2, unsharded: 1} +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: {listIndexes: collName}, + numProfilerEntries: {sharded: 1, unsharded: 1} +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: { + dropIndexes: collName, + index: {[metaField]: 1}, + }, + numProfilerEntries: + {sharded: 2, unsharded: 0 /* command fails when trying to drop a missing index */} +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: {collMod: collName, expireAfterSeconds: 3600}, + numProfilerEntries: {sharded: 2, unsharded: 1} +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: {insert: collName, documents: [{[timeField]: ISODate()}]}, + numProfilerEntries: {sharded: 1, unsharded: 1} +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: {insert: collName, documents: [{[timeField]: ISODate()}]}, + numProfilerEntries: {sharded: 1, unsharded: 1} +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: {aggregate: collName, pipeline: [], cursor: {}}, + numProfilerEntries: {sharded: 2, unsharded: 1} +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: {collStats: collName}, + numProfilerEntries: {sharded: 2, unsharded: 1} +}); + +/** + * On system.buckets namespace + */ +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: { + createIndexes: bucketsCollName, + indexes: [{key: {[timeField]: 1}, name: "index_on_time"}], + }, + numProfilerEntries: {sharded: 2, unsharded: 1}, +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: {listIndexes: bucketsCollName}, + numProfilerEntries: {sharded: 1, unsharded: 1}, +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: { + dropIndexes: bucketsCollName, + index: {meta: 1}, + }, + numProfilerEntries: + {sharded: 2, unsharded: 0 /* command fails when trying to drop a missing index */}, +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: {collMod: bucketsCollName, expireAfterSeconds: 3600}, + numProfilerEntries: {sharded: 2, unsharded: 1}, +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: {aggregate: bucketsCollName, pipeline: [], cursor: {}}, + numProfilerEntries: {sharded: 2, unsharded: 1}, +}); + +runTest({ + shardKey: {[metaField]: 1}, + cmdObj: { + insert: bucketsCollName, + documents: [{ + _id: ObjectId(), + control: {min: {time: ISODate()}, max: {time: ISODate()}, version: 1}, + data: {} + }] + }, + numProfilerEntries: {sharded: 1, unsharded: 1}, +}); + +// TODO SERVER-59180: Add tests for updates. +// TODO SERVER-59181: Add tests for deletes. +st.stop(); +})(); diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp index a683d603d7c..a052e22d81f 100644 --- a/src/mongo/db/catalog/coll_mod.cpp +++ b/src/mongo/db/catalog/coll_mod.cpp @@ -491,8 +491,15 @@ Status _collModInternal(OperationContext* opCtx, .throwIfReshardingInProgress(nss); } + // If db/collection/view does not exist, short circuit and return. if (!db || (!coll && !view)) { + if (nss.isTimeseriesBucketsCollection()) { + // If a sharded time-series collection is dropped, it's possible that a stale mongos + // sends the request on the buckets namespace instead of the view namespace. Ensure that + // the shardVersion is upto date before throwing an error. + CollectionShardingState::get(opCtx, nss)->checkShardVersionOrThrow(opCtx); + } return Status(ErrorCodes::NamespaceNotFound, "ns does not exist"); } diff --git a/src/mongo/db/coll_mod.idl b/src/mongo/db/coll_mod.idl index 0a71b5522d1..e1f193294d3 100644 --- a/src/mongo/db/coll_mod.idl +++ b/src/mongo/db/coll_mod.idl @@ -150,4 +150,10 @@ commands: description: "Adjusts parameters for timeseries collections" optional: true type: CollModTimeseries + isTimeseriesNamespace: + description: "This flag is set to true when the command was originally sent to + mongos on the time-series view, but got rewritten to target + time-series buckets namespace before being sent to shards." + optional: true + type: bool reply_type: CollModReply diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index 00a74a47efb..50addeb0673 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -652,7 +652,10 @@ public: // If the request namespace refers to a time-series collection, transforms the user // time-series index request to one on the underlying bucket. boost::optional timeseriesCmdOwnership; - if (auto options = timeseries::getTimeseriesOptions(opCtx, origCmd.getNamespace())) { + auto isCommandOnTimeseriesBucketNamespace = + origCmd.getIsTimeseriesNamespace() && *origCmd.getIsTimeseriesNamespace(); + if (auto options = timeseries::getTimeseriesOptions( + opCtx, origCmd.getNamespace(), !isCommandOnTimeseriesBucketNamespace)) { timeseriesCmdOwnership = timeseries::makeTimeseriesCreateIndexesCommand(opCtx, origCmd, *options); cmd = ×eriesCmdOwnership.get(); diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index fbf3dbaad96..bb14f4e07c2 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -121,7 +121,10 @@ std::unique_ptr makeTimeseriesBucketsCollModCommand(OperationContext* o const CollMod& origCmd) { const auto& origNs = origCmd.getNamespace(); - auto timeseriesOptions = timeseries::getTimeseriesOptions(opCtx, origNs); + auto isCommandOnTimeseriesBucketNamespace = + origCmd.getIsTimeseriesNamespace() && *origCmd.getIsTimeseriesNamespace(); + auto timeseriesOptions = + timeseries::getTimeseriesOptions(opCtx, origNs, !isCommandOnTimeseriesBucketNamespace); // Return early with null if we are not working with a time-series collection. if (!timeseriesOptions) { @@ -141,7 +144,8 @@ std::unique_ptr makeTimeseriesBucketsCollModCommand(OperationContext* o index->setKeyPattern(std::move(bucketsIndexSpecWithStatus.getValue())); } - auto ns = origNs.makeTimeseriesBucketsNamespace(); + auto ns = + isCommandOnTimeseriesBucketNamespace ? origNs : origNs.makeTimeseriesBucketsNamespace(); auto cmd = std::make_unique(ns); cmd->setIndex(index); cmd->setValidator(origCmd.getValidator()); @@ -166,7 +170,10 @@ std::unique_ptr makeTimeseriesViewCollModCommand(OperationContext* opCt const CollMod& origCmd) { const auto& ns = origCmd.getNamespace(); - auto timeseriesOptions = timeseries::getTimeseriesOptions(opCtx, ns); + auto isCommandOnTimeseriesBucketNamespace = + origCmd.getIsTimeseriesNamespace() && *origCmd.getIsTimeseriesNamespace(); + auto timeseriesOptions = + timeseries::getTimeseriesOptions(opCtx, ns, !isCommandOnTimeseriesBucketNamespace); // Return early with null if we are not working with a time-series collection. if (!timeseriesOptions) { diff --git a/src/mongo/db/commands/drop_indexes.cpp b/src/mongo/db/commands/drop_indexes.cpp index e06cdaec2f4..ce3bbfbb0fa 100644 --- a/src/mongo/db/commands/drop_indexes.cpp +++ b/src/mongo/db/commands/drop_indexes.cpp @@ -96,7 +96,10 @@ public: Reply typedRun(OperationContext* opCtx) final { // If the request namespace refers to a time-series collection, transform the user // time-series index request to one on the underlying bucket. - if (auto options = timeseries::getTimeseriesOptions(opCtx, request().getNamespace())) { + auto isCommandOnTimeseriesBucketNamespace = + request().getIsTimeseriesNamespace() && *request().getIsTimeseriesNamespace(); + if (auto options = timeseries::getTimeseriesOptions( + opCtx, request().getNamespace(), !isCommandOnTimeseriesBucketNamespace)) { auto timeseriesCmd = timeseries::makeTimeseriesDropIndexesCommand(opCtx, request(), *options); return dropIndexes(opCtx, timeseriesCmd.getNamespace(), timeseriesCmd.getIndex()); diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 54725eb7fb5..c9a42882944 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -74,8 +74,13 @@ IndexSpecsWithNamespaceString getIndexSpecsWithNamespaceString(OperationContext* // Since time-series collections don't have UUIDs, we skip the time-series lookup // if the target collection is specified as a UUID. if (const auto& origNss = origNssOrUUID.nss()) { - if (auto timeseriesOptions = timeseries::getTimeseriesOptions(opCtx, *origNss)) { - auto bucketsNss = origNss->makeTimeseriesBucketsNamespace(); + auto isCommandOnTimeseriesBucketNamespace = + cmd.getIsTimeseriesNamespace() && *cmd.getIsTimeseriesNamespace(); + if (auto timeseriesOptions = timeseries::getTimeseriesOptions( + opCtx, *origNss, !isCommandOnTimeseriesBucketNamespace)) { + auto bucketsNss = isCommandOnTimeseriesBucketNamespace + ? *origNss + : origNss->makeTimeseriesBucketsNamespace(); AutoGetCollectionForReadCommandMaybeLockFree autoColl(opCtx, bucketsNss); const CollectionPtr& coll = autoColl.getCollection(); diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 64c8b9d3f7b..d794025b2c0 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -121,6 +121,9 @@ bool isTimeseries(OperationContext* opCtx, const Request& request) { "system.buckets namespace", !request.getIsTimeseriesNamespace() || request.getNamespace().isTimeseriesBucketsCollection()); + const auto bucketNss = request.getIsTimeseriesNamespace() + ? request.getNamespace() + : request.getNamespace().makeTimeseriesBucketsNamespace(); // If the buckets collection exists now, the time-series insert path will check for the // existence of the buckets collection later on with a lock. @@ -128,11 +131,9 @@ bool isTimeseries(OperationContext* opCtx, const Request& request) { // collection does not yet exist, this check may return false unnecessarily. As a result, an // insert attempt into the time-series namespace will either succeed or fail, depending on who // wins the race. - return request.getIsTimeseriesNamespace() || - CollectionCatalog::get(opCtx) - ->lookupCollectionByNamespaceForRead( - opCtx, request.getNamespace().makeTimeseriesBucketsNamespace()) - .get(); + return CollectionCatalog::get(opCtx) + ->lookupCollectionByNamespaceForRead(opCtx, bucketNss) + .get(); } NamespaceString makeTimeseriesBucketsNamespace(const NamespaceString& nss) { diff --git a/src/mongo/db/create_indexes.idl b/src/mongo/db/create_indexes.idl index 12f629ed7e2..e7b60e73790 100644 --- a/src/mongo/db/create_indexes.idl +++ b/src/mongo/db/create_indexes.idl @@ -183,3 +183,9 @@ commands: description: 'Commit Quorum options' type: CommitQuorum optional: true + isTimeseriesNamespace: + description: "This flag is set to true when the command was originally sent to + mongos on the time-series view, but got rewritten to target + time-series buckets namespace before being sent to shards." + type: bool + optional: true diff --git a/src/mongo/db/drop_indexes.idl b/src/mongo/db/drop_indexes.idl index 76d17a53e9e..cb4ea9eddb2 100644 --- a/src/mongo/db/drop_indexes.idl +++ b/src/mongo/db/drop_indexes.idl @@ -71,4 +71,10 @@ commands: - string - array - object + isTimeseriesNamespace: + description: "This flag is set to true when the command was originally sent to + mongos on the time-series view, but got rewritten to target + time-series buckets namespace before being sent to shards." + type: bool + optional: true reply_type: DropIndexesReply diff --git a/src/mongo/db/list_indexes.idl b/src/mongo/db/list_indexes.idl index 48900398be6..d4f2caf9d96 100644 --- a/src/mongo/db/list_indexes.idl +++ b/src/mongo/db/list_indexes.idl @@ -179,4 +179,10 @@ commands: type: safeBool optional: true unstable: true + isTimeseriesNamespace: + description: "This flag is set to true when the command was originally sent to + mongos on the time-series view, but got rewritten to target + time-series buckets namespace before being sent to shards." + type: bool + optional: true reply_type: ListIndexesReply diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index 2b40e7df204..9cc00c593f2 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -623,7 +623,7 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx str::stream() << "can't shard time-series collection " << nss(), feature_flags::gFeatureFlagShardedTimeSeries.isEnabled( serverGlobalParams.featureCompatibility) || - !timeseries::getTimeseriesOptions(opCtx, nss().getTimeseriesViewNamespace())); + !timeseries::getTimeseriesOptions(opCtx, nss(), false)); } // Ensure the namespace is valid. diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index b0b2a905864..df094434a39 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -1506,7 +1506,18 @@ void ExecCommandDatabase::_initiateCommand() { if (!opCtx->getClient()->isInDirectClient() && readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && (iAmPrimary || (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { - oss.initializeClientRoutingVersionsFromCommand(_invocation->ns(), request.body); + // If a timeseries collection is sharded, only the buckets collection would be sharded. We + // expect all versioned commands to be sent over 'system.buckets' namespace. But it is + // possible that a stale mongos may send the request over a view namespace. In this case, we + // initialize the 'OperationShardingState' with buckets namespace. + auto bucketNss = _invocation->ns().makeTimeseriesBucketsNamespace(); + auto namespaceForSharding = CollectionCatalog::get(opCtx) + ->lookupCollectionByNamespaceForRead(opCtx, bucketNss) + .get() + ? bucketNss + : _invocation->ns(); + + oss.initializeClientRoutingVersionsFromCommand(namespaceForSharding, request.body); auto const shardingState = ShardingState::get(opCtx); if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) { diff --git a/src/mongo/db/timeseries/catalog_helper.cpp b/src/mongo/db/timeseries/catalog_helper.cpp index 5ac55527f1a..4b577d55d30 100644 --- a/src/mongo/db/timeseries/catalog_helper.cpp +++ b/src/mongo/db/timeseries/catalog_helper.cpp @@ -38,8 +38,9 @@ namespace mongo { namespace timeseries { boost::optional getTimeseriesOptions(OperationContext* opCtx, - const NamespaceString& nss) { - auto bucketsNs = nss.makeTimeseriesBucketsNamespace(); + const NamespaceString& nss, + bool convertToBucketsNamespace) { + auto bucketsNs = convertToBucketsNamespace ? nss.makeTimeseriesBucketsNamespace() : nss; auto bucketsColl = CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs); if (!bucketsColl) { diff --git a/src/mongo/db/timeseries/catalog_helper.h b/src/mongo/db/timeseries/catalog_helper.h index 969410d1c45..14cc681b4c3 100644 --- a/src/mongo/db/timeseries/catalog_helper.h +++ b/src/mongo/db/timeseries/catalog_helper.h @@ -46,7 +46,8 @@ namespace timeseries { * collection. Otherwise returns boost::none. */ boost::optional getTimeseriesOptions(OperationContext* opCtx, - const NamespaceString& nss); + const NamespaceString& nss, + bool convertToBucketsNamespace); } // namespace timeseries } // namespace mongo diff --git a/src/mongo/db/timeseries/timeseries_commands_conversion_helper.cpp b/src/mongo/db/timeseries/timeseries_commands_conversion_helper.cpp index 233add4ada4..79d8483ab6f 100644 --- a/src/mongo/db/timeseries/timeseries_commands_conversion_helper.cpp +++ b/src/mongo/db/timeseries/timeseries_commands_conversion_helper.cpp @@ -41,9 +41,17 @@ namespace mongo::timeseries { +namespace { +NamespaceString makeTimeseriesBucketsNamespace(const NamespaceString& nss) { + return nss.isTimeseriesBucketsCollection() ? nss : nss.makeTimeseriesBucketsNamespace(); +} +} // namespace + + BSONObj makeTimeseriesCommand(const BSONObj& origCmd, const NamespaceString& ns, - const StringData nsFieldName) { + const StringData nsFieldName, + boost::optional appendTimeSeriesFlag) { // Translate time-series collection view namespace to bucket namespace. const auto bucketNs = ns.makeTimeseriesBucketsNamespace(); BSONObjBuilder builder; @@ -54,6 +62,10 @@ BSONObj makeTimeseriesCommand(const BSONObj& origCmd, builder.append(entry); } } + + if (appendTimeSeriesFlag) { + builder.append(*appendTimeSeriesFlag, true); + } return builder.obj(); } @@ -137,7 +149,7 @@ CreateIndexesCommand makeTimeseriesCreateIndexesCommand(OperationContext* opCtx, indexes.push_back(builder.obj()); } - auto ns = origNs.makeTimeseriesBucketsNamespace(); + auto ns = makeTimeseriesBucketsNamespace(origNs); auto cmd = CreateIndexesCommand(ns, std::move(indexes)); cmd.setV(origCmd.getV()); cmd.setIgnoreUnknownIndexOptions(origCmd.getIgnoreUnknownIndexOptions()); @@ -150,7 +162,7 @@ DropIndexes makeTimeseriesDropIndexesCommand(OperationContext* opCtx, const DropIndexes& origCmd, const TimeseriesOptions& options) { const auto& origNs = origCmd.getNamespace(); - auto ns = origNs.makeTimeseriesBucketsNamespace(); + auto ns = makeTimeseriesBucketsNamespace(origNs); const auto& origIndex = origCmd.getIndex(); if (auto keyPtr = stdx::get_if(&origIndex)) { diff --git a/src/mongo/db/timeseries/timeseries_commands_conversion_helper.h b/src/mongo/db/timeseries/timeseries_commands_conversion_helper.h index eef1de61621..d2641a17d65 100644 --- a/src/mongo/db/timeseries/timeseries_commands_conversion_helper.h +++ b/src/mongo/db/timeseries/timeseries_commands_conversion_helper.h @@ -43,7 +43,8 @@ namespace mongo::timeseries { */ BSONObj makeTimeseriesCommand(const BSONObj& origCmd, const NamespaceString& ns, - StringData nsFieldName); + StringData nsFieldName, + boost::optional appendTimeSeriesFlag); /* * Returns a CreateIndexesCommand for creating indexes on the bucket collection. diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index a084d0e211a..f170e835034 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -235,29 +235,60 @@ bool isMetadataDifferent(const ChunkManager& managerA, const ChunkManager& manag ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx, const NamespaceString& nss, boost::optional targetEpoch) - : _nss(nss), _targetEpoch(std::move(targetEpoch)), _cm(_init(opCtx)) {} + : _nss(nss), _targetEpoch(std::move(targetEpoch)), _cm(_init(opCtx, false)) {} -ChunkManager ChunkManagerTargeter::_init(OperationContext* opCtx) { +/** + * Initializes and returns the ChunkManger which needs to be used for targeting. + * If 'refresh' is true, additionally fetches the latest routing info from the config servers. + * + * Note: For sharded time-series collections, we use the buckets collection for targeting. If the + * user request is on the view namespace, we implicity tranform the request to the buckets namepace. + */ +ChunkManager ChunkManagerTargeter::_init(OperationContext* opCtx, bool refresh) { cluster::createDatabase(opCtx, _nss.db()); - // Check if we target sharded time-series collection. For such collections we target write - // operations to the underlying buckets collection. - // - // A sharded time-series collection by definition is a view, which has underlying sharded - // buckets collection. We know that 'ChunkManager::isSharded()' is false for all views. Checking - // this condition first can potentially save us extra cache lookup. After that, we lookup - // routing info for the underlying buckets collection. The absense of this routing info means - // that this collection does not exist. Finally, we check if this underlying collection is - // sharded. If all these conditions pass, we are targeting sharded time-series collection. + if (refresh) { + uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss)); + } auto cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); - if (!cm.isSharded()) { + + // For a sharded time-series collection, only the underlying buckets collection is stored on the + // config servers. If the user operation is on the time-series view namespace, we should check + // if the buckets namespace is sharded. There are a few cases that we need to take care of, + // 1. The request is on the view namespace. We check if the buckets collection is sharded. If + // it is, we use the buckets collection namespace for the purpose of trageting. Additionally, + // we set the '_isRequestOnTimeseriesViewNamespace' to true for this case. + // 2. If request is on the buckets namespace, we don't need to execute any additional + // time-series logic. We can treat the request as though it was a request on a regular + // collection. + // 3. During a cache refresh a the buckets collection changes from sharded to unsharded. In this + // case, if the original request is on the view namespace, then we should reset the namespace + // back to the view namespace and reset '_isRequestOnTimeseriesViewNamespace'. + if (!cm.isSharded() && !_nss.isTimeseriesBucketsCollection()) { auto bucketsNs = _nss.makeTimeseriesBucketsNamespace(); + if (refresh) { + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh( + opCtx, bucketsNs)); + } auto bucketsRoutingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, bucketsNs)); if (bucketsRoutingInfo.isSharded()) { _nss = bucketsNs; cm = std::move(bucketsRoutingInfo); + _isRequestOnTimeseriesViewNamespace = true; } + } else if (!cm.isSharded() && _isRequestOnTimeseriesViewNamespace) { + // This can happen if a sharded time-series collection is dropped and re-created. Then we + // need to reset the namepace to the original namespace. + _nss = _nss.getTimeseriesViewNamespace(); + + if (refresh) { + uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss)); + } + cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); + _isRequestOnTimeseriesViewNamespace = false; } if (_targetEpoch) { @@ -312,7 +343,7 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, if (_cm.isSharded()) { const auto& shardKeyPattern = _cm.getShardKeyPattern(); - if (_nss.isTimeseriesBucketsCollection()) { + if (_isRequestOnTimeseriesViewNamespace) { auto tsFields = _cm.getTimeseriesFields(); tassert(5743701, "Missing timeseriesFields on buckets collection", tsFields); shardKey = extractBucketsShardKeyFromTimeseriesDoc( @@ -559,7 +590,16 @@ void ChunkManagerTargeter::noteStaleShardResponse(OperationContext* opCtx, const StaleConfigInfo& staleInfo) { dassert(!_lastError || _lastError.get() == LastErrorType::kStaleShardVersion); Grid::get(opCtx)->catalogCache()->invalidateShardOrEntireCollectionEntryForShardedCollection( - _nss, staleInfo.getVersionWanted(), endpoint.shardName); + staleInfo.getNss(), staleInfo.getVersionWanted(), endpoint.shardName); + + if (staleInfo.getNss() != _nss) { + // This can happen when a time-series collection becomes sharded. + Grid::get(opCtx) + ->catalogCache() + ->invalidateShardOrEntireCollectionEntryForShardedCollection( + _nss, staleInfo.getVersionWanted(), endpoint.shardName); + } + _lastError = LastErrorType::kStaleShardVersion; } @@ -590,14 +630,12 @@ bool ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx) { // Get the latest metadata information from the cache if there were issues auto lastManager = _cm; - _cm = _init(opCtx); + _cm = _init(opCtx, false); auto metadataChanged = isMetadataDifferent(lastManager, _cm); if (_lastError.get() == LastErrorType::kCouldNotTarget && !metadataChanged) { // If we couldn't target and we dind't already update the metadata we must force a refresh - uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss)); - _cm = _init(opCtx); + _cm = _init(opCtx, true); metadataChanged = isMetadataDifferent(lastManager, _cm); } diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h index 57e9fc2a55d..0aef3ecbf52 100644 --- a/src/mongo/s/chunk_manager_targeter.h +++ b/src/mongo/s/chunk_manager_targeter.h @@ -122,7 +122,7 @@ public: const TimeseriesOptions& timeseriesOptions); private: - ChunkManager _init(OperationContext* opCtx); + ChunkManager _init(OperationContext* opCtx, bool refresh); /** * Returns a vector of ShardEndpoints for a potentially multi-shard query. @@ -150,6 +150,9 @@ private: // Full namespace of the collection for this targeter NamespaceString _nss; + // Used to identify the original namespace that the user has requested. + bool _isRequestOnTimeseriesViewNamespace = false; + // Stores last error occurred boost::optional _lastError; diff --git a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp index c03c83279ff..f85ae09aeb2 100644 --- a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp @@ -216,7 +216,8 @@ public: // Translate command collection namespace for time-series collection. if (targeter.timeseriesNamespaceNeedsRewrite(nss)) { - cmdObjToSend = timeseries::makeTimeseriesCommand(cmdObjToSend, nss, getName()); + cmdObjToSend = + timeseries::makeTimeseriesCommand(cmdObjToSend, nss, getName(), boost::none); } // Unscaled individual shard results. This is required to apply scaling after summing the diff --git a/src/mongo/s/commands/cluster_collection_mod_cmd.cpp b/src/mongo/s/commands/cluster_collection_mod_cmd.cpp index ac369c6a8ba..e2e56136543 100644 --- a/src/mongo/s/commands/cluster_collection_mod_cmd.cpp +++ b/src/mongo/s/commands/cluster_collection_mod_cmd.cpp @@ -36,6 +36,7 @@ #include "mongo/db/coll_mod_gen.h" #include "mongo/db/coll_mod_reply_validation.h" #include "mongo/db/commands.h" +#include "mongo/db/timeseries/timeseries_commands_conversion_helper.h" #include "mongo/logv2/log.h" #include "mongo/s/chunk_manager_targeter.h" #include "mongo/s/cluster_commands_helpers.h" @@ -94,15 +95,19 @@ public: const auto targeter = ChunkManagerTargeter(opCtx, nss); const auto& routingInfo = targeter.getRoutingInfo(); + auto cmdToBeSent = cmdObj; + if (targeter.timeseriesNamespaceNeedsRewrite(nss)) { + cmdToBeSent = timeseries::makeTimeseriesCommand( + cmdToBeSent, nss, getName(), CollMod::kIsTimeseriesNamespaceFieldName); + } + auto shardResponses = scatterGatherVersionedTargetByRoutingTable( opCtx, cmd.getDbName(), targeter.getNS(), routingInfo, applyReadWriteConcern( - opCtx, - this, - CommandHelpers::filterCommandRequestForPassthrough(cmd.toBSON(BSONObj()))), + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdToBeSent)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kNoRetry, BSONObj() /* query */, diff --git a/src/mongo/s/commands/cluster_create_indexes_cmd.cpp b/src/mongo/s/commands/cluster_create_indexes_cmd.cpp index 05cec36eef0..8c4c8af8784 100644 --- a/src/mongo/s/commands/cluster_create_indexes_cmd.cpp +++ b/src/mongo/s/commands/cluster_create_indexes_cmd.cpp @@ -95,14 +95,8 @@ public: auto routingInfo = targeter.getRoutingInfo(); auto cmdToBeSent = cmdObj; if (targeter.timeseriesNamespaceNeedsRewrite(nss)) { - auto timeseriesCmd = timeseries::makeTimeseriesCreateIndexesCommand( - opCtx, - CreateIndexesCommand::parse( - IDLParserErrorContext("createIndexes", - APIParameters::get(opCtx).getAPIStrict().value_or(false)), - cmdObj), - routingInfo.getTimeseriesFields()->getTimeseriesOptions()); - cmdToBeSent = timeseriesCmd.toBSON(cmdObj); + cmdToBeSent = timeseries::makeTimeseriesCommand( + cmdToBeSent, nss, getName(), CreateIndexesCommand::kIsTimeseriesNamespaceFieldName); } auto shardResponses = scatterGatherVersionedTargetByRoutingTable( diff --git a/src/mongo/s/commands/cluster_drop_indexes_cmd.cpp b/src/mongo/s/commands/cluster_drop_indexes_cmd.cpp index d0cd02051fd..24f2f507137 100644 --- a/src/mongo/s/commands/cluster_drop_indexes_cmd.cpp +++ b/src/mongo/s/commands/cluster_drop_indexes_cmd.cpp @@ -146,16 +146,11 @@ public: // chunks for the collection. auto targeter = ChunkManagerTargeter(opCtx, nss); auto routingInfo = targeter.getRoutingInfo(); + auto cmdToBeSent = cmdObj; if (targeter.timeseriesNamespaceNeedsRewrite(nss)) { - auto timeseriesCmd = timeseries::makeTimeseriesDropIndexesCommand( - opCtx, - DropIndexes::parse( - IDLParserErrorContext("dropIndexes", - APIParameters::get(opCtx).getAPIStrict().value_or(false)), - cmdObj), - routingInfo.getTimeseriesFields()->getTimeseriesOptions()); - cmdToBeSent = timeseriesCmd.toBSON(cmdObj); + cmdToBeSent = timeseries::makeTimeseriesCommand( + cmdToBeSent, nss, getName(), DropIndexes::kIsTimeseriesNamespaceFieldName); } auto shardResponses = diff --git a/src/mongo/s/commands/cluster_list_indexes_cmd.cpp b/src/mongo/s/commands/cluster_list_indexes_cmd.cpp index 12bfd1e8800..3ae4cea812b 100644 --- a/src/mongo/s/commands/cluster_list_indexes_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_indexes_cmd.cpp @@ -34,6 +34,7 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/list_indexes_gen.h" +#include "mongo/db/timeseries/timeseries_commands_conversion_helper.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/chunk_manager_targeter.h" #include "mongo/s/cluster_commands_helpers.h" @@ -115,14 +116,22 @@ public: // The command's IDL definition permits namespace or UUID, but mongos requires a // namespace. - auto targetter = ChunkManagerTargeter(opCtx, ns()); - auto cm = targetter.getRoutingInfo(); + auto targeter = ChunkManagerTargeter(opCtx, ns()); + auto cm = targeter.getRoutingInfo(); + auto cmdToBeSent = request().toBSON({}); + if (targeter.timeseriesNamespaceNeedsRewrite(ns())) { + cmdToBeSent = + timeseries::makeTimeseriesCommand(cmdToBeSent, + ns(), + ListIndexes::kCommandName, + ListIndexes::kIsTimeseriesNamespaceFieldName); + } return cursorCommandPassthroughShardWithMinKeyChunk( opCtx, - targetter.getNS(), + targeter.getNS(), cm, - applyReadWriteConcern(opCtx, this, request().toBSON({})), + applyReadWriteConcern(opCtx, this, cmdToBeSent), {Privilege(ResourcePattern::forExactNamespace(ns()), ActionType::listIndexes)}); } }; -- cgit v1.2.1