diff options
author | Nikita Lapkov <nikita.lapkov@mongodb.com> | 2021-09-21 15:12:04 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-01 15:48:51 +0000 |
commit | 4ac4e3d9a41cd5d2dd6cae2669b9c75a457ec144 (patch) | |
tree | 7011779d307b94806513691a856b4839d1f97ea3 | |
parent | b72d07a8b0f1f5d5935ae039196e9cd6531b3406 (diff) | |
download | mongo-4ac4e3d9a41cd5d2dd6cae2669b9c75a457ec144.tar.gz |
SERVER-59181 Support delete operation for sharded time-series collections
-rw-r--r-- | jstests/core/timeseries/libs/timeseries.js | 7 | ||||
-rw-r--r-- | jstests/core/timeseries/timeseries_insert_after_delete.js | 11 | ||||
-rw-r--r-- | jstests/sharding/timeseries_delete.js | 394 | ||||
-rw-r--r-- | jstests/sharding/timeseries_multiple_mongos.js | 71 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 29 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager_targeter.cpp | 38 |
7 files changed, 521 insertions, 30 deletions
diff --git a/jstests/core/timeseries/libs/timeseries.js b/jstests/core/timeseries/libs/timeseries.js index 7ec3969e972..a226803058a 100644 --- a/jstests/core/timeseries/libs/timeseries.js +++ b/jstests/core/timeseries/libs/timeseries.js @@ -35,6 +35,13 @@ var TimeseriesTest = class { .featureFlagShardedTimeSeries.value; } + static shardedTimeseriesUpdatesAndDeletesEnabled(conn) { + return assert + .commandWorked( + conn.adminCommand({getParameter: 1, featureFlagShardedTimeSeriesUpdateDelete: 1})) + .featureFlagShardedTimeSeriesUpdateDelete.value; + } + static timeseriesMetricIndexesEnabled(conn) { return assert .commandWorked( diff --git a/jstests/core/timeseries/timeseries_insert_after_delete.js b/jstests/core/timeseries/timeseries_insert_after_delete.js index 4a033f391c1..e1ede952d18 100644 --- a/jstests/core/timeseries/timeseries_insert_after_delete.js +++ b/jstests/core/timeseries/timeseries_insert_after_delete.js @@ -1,7 +1,6 @@ /** * Tests running the delete command on a time-series collection closes the in-memory bucket. * @tags: [ - * assumes_unsharded_collection, # TODO SERVER-59180: Remove this tag. * does_not_support_stepdowns, * does_not_support_transactions, * requires_getmore, @@ -11,13 +10,21 @@ (function() { "use strict"; -load("jstests/core/timeseries/libs/timeseries.js"); +load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest'. +load("jstests/libs/fixture_helpers.js"); // For 'FixtureHelpers'. if (!TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(db.getMongo())) { jsTestLog("Skipping test because the time-series updates and deletes feature flag is disabled"); return; } +if (FixtureHelpers.isMongos(db) && + !TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(db.getMongo())) { + jsTestLog( + "Skipping test because the sharded time-series updates and deletes feature flag is disabled"); + return; +} + TimeseriesTest.run((insert) => { const testDB = db.getSiblingDB(jsTestName()); assert.commandWorked(testDB.dropDatabase()); diff --git a/jstests/sharding/timeseries_delete.js b/jstests/sharding/timeseries_delete.js new file mode 100644 index 00000000000..d00a61e4b00 --- /dev/null +++ b/jstests/sharding/timeseries_delete.js @@ -0,0 +1,394 @@ +/** + * Test deletes from sharded timeseries collection. + * + * @tags: [ + * requires_fcv_51, + * requires_find_command + * ] + */ + +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest' helpers. + +Random.setRandomSeed(); + +const dbName = 'testDB'; +const collName = 'testColl'; +const timeField = 'time'; +const metaField = 'hostid'; + +// Connections. +const st = new ShardingTest({shards: 2, rs: {nodes: 2}}); +const mongos = st.s0; + +// Sanity checks. +if (!TimeseriesTest.timeseriesCollectionsEnabled(st.shard0)) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + st.stop(); + return; +} + +if (!TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { + jsTestLog("Skipping test because the sharded time-series collection feature flag is disabled"); + st.stop(); + return; +} + +const deletesEnabled = TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(st.shard0); +if (!deletesEnabled) { + jsTestLog( + "Sharded time-series updates and deletes feature flag is disabled, expecting all delete commands to fail."); +} + +// Databases. +assert.commandWorked(mongos.adminCommand({enableSharding: dbName})); +const mainDB = mongos.getDB(dbName); + +function generateTimeValue(index) { + return ISODate(`${2000 + index}-01-01`); +} + +function generateDocsForTestCase(collConfig) { + const documents = TimeseriesTest.generateHosts(collConfig.nDocs); + for (let i = 0; i < collConfig.nDocs; i++) { + documents[i]._id = i; + if (collConfig.metaGenerator) { + documents[i][metaField] = collConfig.metaGenerator(i); + } + documents[i][timeField] = generateTimeValue(i); + } + return documents; +} + +const collectionConfigurations = { + // Shard key only on meta field/subfields. + metaShardKey: { + nDocs: 4, + metaGenerator: (id => id), + shardKey: {[metaField]: 1}, + splitPoint: {meta: 2}, + }, + metaObjectShardKey: { + nDocs: 4, + metaGenerator: (index => ({a: index})), + shardKey: {[metaField]: 1}, + splitPoint: {meta: {a: 2}}, + }, + metaSubFieldShardKey: { + nDocs: 4, + metaGenerator: (index => ({a: index})), + shardKey: {[metaField + '.a']: 1}, + splitPoint: {'meta.a': 2}, + }, + + // Shard key on time field. + timeShardKey: { + nDocs: 4, + shardKey: {[timeField]: 1}, + splitPoint: {[`control.min.${timeField}`]: generateTimeValue(2)}, + }, + + // Shard key on both meta and time field. + metaTimeShardKey: { + nDocs: 4, + metaGenerator: (id => id), + shardKey: {[metaField]: 1, [timeField]: 1}, + splitPoint: {meta: 2, [`control.min.${timeField}`]: generateTimeValue(2)}, + }, + metaObjectTimeShardKey: { + nDocs: 4, + metaGenerator: (index => ({a: index})), + shardKey: {[metaField]: 1, [timeField]: 1}, + splitPoint: {meta: {a: 2}, [`control.min.${timeField}`]: generateTimeValue(2)}, + }, + metaSubFieldTimeShardKey: { + nDocs: 4, + metaGenerator: (index => ({a: index})), + shardKey: {[metaField + '.a']: 1, [timeField]: 1}, + splitPoint: {'meta.a': 1, [`control.min.${timeField}`]: generateTimeValue(2)}, + }, +}; + +const requestConfigurations = { + emptyFilter: { + deleteQuery: {}, + remainingDocumentsIds: [], + reachesShard0: true, + reachesShard1: true, + }, + metaFilterOneShard: { + deletePredicates: [{[metaField]: 2}, {[metaField]: 3}], + remainingDocumentsIds: [0, 1], + reachesShard0: false, + reachesShard1: true, + }, + metaFilterTwoShards: { + deletePredicates: [{[metaField]: 1}, {[metaField]: 2}], + remainingDocumentsIds: [0, 3], + reachesShard0: true, + reachesShard1: true, + }, + metaObjectFilterOneShard: { + deletePredicates: [{[metaField]: {a: 2}}, {[metaField]: {a: 3}}], + remainingDocumentsIds: [0, 1], + reachesShard0: false, + reachesShard1: true, + }, + metaObjectFilterTwoShards: { + deletePredicates: [{[metaField]: {a: 1}}, {[metaField]: {a: 2}}], + remainingDocumentsIds: [0, 3], + reachesShard0: true, + reachesShard1: true, + }, + metaSubFieldFilterOneShard: { + deletePredicates: [{[metaField + '.a']: 2}, {[metaField + '.a']: 3}], + remainingDocumentsIds: [0, 1], + reachesShard0: false, + reachesShard1: true, + }, + metaSubFieldFilterTwoShards: { + deletePredicates: [{[metaField + '.a']: 1}, {[metaField + '.a']: 2}], + remainingDocumentsIds: [0, 3], + reachesShard0: true, + reachesShard1: true, + }, +}; + +const testCases = { + // Shard key only on meta field/subfields. + metaShardKey: ['emptyFilter', 'metaFilterOneShard', 'metaFilterTwoShards'], + metaObjectShardKey: [ + 'emptyFilter', + 'metaObjectFilterOneShard', + 'metaObjectFilterTwoShards', + 'metaSubFieldFilterTwoShards' + ], + metaSubFieldShardKey: [ + 'emptyFilter', + 'metaObjectFilterTwoShards', + 'metaSubFieldFilterOneShard', + 'metaSubFieldFilterTwoShards' + ], + + // Shard key on time field. + timeShardKey: ['emptyFilter'], + + // Shard key on both meta and time field. + metaTimeShardKey: ['emptyFilter', 'metaFilterTwoShards'], + metaObjectTimeShardKey: + ['emptyFilter', 'metaObjectFilterTwoShards', 'metaSubFieldFilterTwoShards'], + metaSubFieldTimeShardKey: + ['emptyFilter', 'metaObjectFilterTwoShards', 'metaSubFieldFilterTwoShards'], +}; + +function runTest(collConfig, reqConfig, insert) { + jsTestLog(`Running a test with configuration: ${tojson({collConfig, reqConfig})}`); + + // Ensure that the collection does not exist. + const coll = mainDB.getCollection(collName); + coll.drop(); + + // Create timeseries collection. + const tsOptions = {timeField: timeField}; + const hasMetaField = !!collConfig.metaGenerator; + if (hasMetaField) { + tsOptions.metaField = metaField; + } + assert.commandWorked(mainDB.createCollection(collName, {timeseries: tsOptions})); + + // Shard timeseries collection. + assert.commandWorked(coll.createIndex(collConfig.shardKey)); + assert.commandWorked(mongos.adminCommand({ + shardCollection: `${dbName}.${collName}`, + key: collConfig.shardKey, + })); + + // Insert initial set of documents. + const documents = generateDocsForTestCase(collConfig); + assert.commandWorked(insert(coll, documents)); + + // Manually split the data into two chunks. + assert.commandWorked(mongos.adminCommand( + {split: `${dbName}.system.buckets.${collName}`, middle: collConfig.splitPoint})); + + // Ensure that currently both chunks reside on the primary shard. + let counts = st.chunkCounts(`system.buckets.${collName}`, dbName); + const primaryShard = st.getPrimaryShard(dbName); + assert.eq(2, counts[primaryShard.shardName], counts); + + // Move one of the chunks into the second shard. + const otherShard = st.getOther(primaryShard); + assert.commandWorked(mongos.adminCommand({ + movechunk: `${dbName}.system.buckets.${collName}`, + find: collConfig.splitPoint, + to: otherShard.name, + _waitForDelete: true + })); + + // Ensure that each shard owns one chunk. + counts = st.chunkCounts(`system.buckets.${collName}`, dbName); + assert.eq(1, counts[primaryShard.shardName], counts); + assert.eq(1, counts[otherShard.shardName], counts); + + const isBulkOperation = !reqConfig.deleteQuery; + if (!isBulkOperation) { + // If sharded updates and deletes feature flag is disabled, we only test that the delete + // command fails. + if (!deletesEnabled) { + assert.throwsWithCode(() => coll.deleteMany(reqConfig.deleteQuery), + ErrorCodes.NotImplemented); + return; + } + + // The 'isTimeseriesNamespace' parameter is not allowed on mongos. + const failingDeleteCommand = { + delete: `system.buckets.${collName}`, + deletes: [ + { + q: reqConfig.deleteQuery, + limit: 0, + }, + ], + isTimeseriesNamespace: true, + }; + assert.commandFailedWithCode(mainDB.runCommand(failingDeleteCommand), 5916401); + + // On a mongod node, 'isTimeseriesNamespace' can only be used on time-series buckets + // namespace. + failingDeleteCommand.delete = collName; + assert.commandFailedWithCode(st.shard0.getDB(dbName).runCommand(failingDeleteCommand), + 5916400); + + // Currently, we do not support queries on non-meta fields for delete commands. + delete failingDeleteCommand.isTimeseriesNamespace; + for (let additionalField of [timeField, 'randomFieldWhichShouldNotBeHere']) { + // JavaScript does not have a reliable way to perform deep copy of an object. So instead + // of copying delete query each time, we just set and unset additional fields in it. See + // https://stackoverflow.com/a/122704 for details. + failingDeleteCommand.deletes[0].q[additionalField] = 1; + assert.commandFailedWithCode(mainDB.runCommand(failingDeleteCommand), + ErrorCodes.InvalidOptions); + delete failingDeleteCommand.deletes[0].q[additionalField]; + } + + // Currently, we support only delete commands with 'limit: 0' for sharded time-series + // collections. + failingDeleteCommand.deletes[0].limit = 1; + assert.commandFailedWithCode(mainDB.runCommand(failingDeleteCommand), + ErrorCodes.IllegalOperation); + } + + // Reset database profiler. + const primaryDB = primaryShard.getDB(dbName); + const otherDB = otherShard.getDB(dbName); + for (let shardDB of [primaryDB, otherDB]) { + shardDB.setProfilingLevel(0); + shardDB.system.profile.drop(); + shardDB.setProfilingLevel(2); + } + + // Perform valid delete. + if (!isBulkOperation) { + assert.commandWorked(coll.deleteMany(reqConfig.deleteQuery)); + } else { + let bulk; + let predicates; + if (reqConfig.unorderedBulkDeletes) { + bulk = coll.initializeUnorderedBulkOp(); + predicates = reqConfig.unorderedBulkDeletes; + } else { + bulk = coll.initializeOrderedBulkOp(); + predicates = reqConfig.orderedBulkDeletes; + } + + for (let predicate of predicates) { + bulk.find(predicate).remove(); + } + if (deletesEnabled) { + assert.commandWorked(bulk.execute()); + } else { + assert.throws(() => bulk.execute()); + return; + } + } + + // Check that the query was routed to the correct shards. + const profilerFilter = { + op: 'remove', + ns: `${dbName}.${collName}`, + // Filter out events recorded because of StaleConfig error. + ok: {$ne: 0}, + }; + const shard0Entries = primaryDB.system.profile.find(profilerFilter).itcount(); + const shard1Entries = otherDB.system.profile.find(profilerFilter).itcount(); + if (reqConfig.reachesShard0) { + assert.gt(shard0Entries, 0); + } else { + assert.eq(shard0Entries, 0); + } + if (reqConfig.reachesShard1) { + assert.gt(shard1Entries, 0); + } else { + assert.eq(shard1Entries, 0); + } + + // Ensure that the collection contains only expected documents. + const remainingIds = coll.find({}, {_id: 1}).sort({_id: 1}).toArray().map(x => x._id); + + reqConfig.remainingDocumentsIds.sort(); + + assert.eq(remainingIds, reqConfig.remainingDocumentsIds, ` + Delete query: ${tojsononeline(reqConfig.deleteQuery)} + Input documents: + Ids: ${tojsononeline(documents.map(x => x._id))} + Meta: ${tojsononeline(documents.map(x => x[metaField]))} + Time: ${tojsononeline(documents.map(x => x[timeField]))} + Remaining ids: ${tojsononeline(remainingIds)} + Expected remaining ids: ${tojsononeline(reqConfig.remainingDocumentsIds)} + `); +} + +TimeseriesTest.run((insert) => { + for (const [collConfigName, reqConfigNames] of Object.entries(testCases)) { + const collConfig = collectionConfigurations[collConfigName]; + for (let reqConfigName of reqConfigNames) { + const reqConfig = requestConfigurations[reqConfigName]; + + try { + // Some request configurations do not support bulk operations. + if (reqConfig.deleteQuery) { + runTest(collConfig, reqConfig, insert); + continue; + } + + const deletePredicates = reqConfig.deletePredicates; + + // Test single 'coll.deleteMany()' call with $or predicate. + reqConfig.deleteQuery = {$or: deletePredicates}; + runTest(collConfig, reqConfig, insert); + delete reqConfig.deleteQuery; + + // Test multiple deletes sent through unordered bulk interface. + reqConfig.unorderedBulkDeletes = deletePredicates; + runTest(collConfig, reqConfig, insert); + delete reqConfig.unorderedBulkDeletes; + + // Test multiple deletes sent through ordered bulk interface. + reqConfig.orderedBulkDeletes = deletePredicates; + runTest(collConfig, reqConfig, insert); + delete reqConfig.orderedBulkDeletes; + } catch (e) { + jsTestLog(`Test case failed. Configurations: + - Collection "${collConfigName}" = ${tojson(collConfig)} + - Request "${reqConfigName}" = ${tojson(reqConfig)} + `); + throw e; + } + } + } +}, mainDB); + +st.stop(); +})(); diff --git a/jstests/sharding/timeseries_multiple_mongos.js b/jstests/sharding/timeseries_multiple_mongos.js index 9450b692d71..4a0d38a3254 100644 --- a/jstests/sharding/timeseries_multiple_mongos.js +++ b/jstests/sharding/timeseries_multiple_mongos.js @@ -56,6 +56,8 @@ function generateBatch(size) { * command by running against mongos1 with stale config. */ function runTest({shardKey, cmdObj, numProfilerEntries}) { + const isDelete = cmdObj["delete"] !== undefined; + // Insert some dummy data using 'mongos1' as the router, so that the cache is initialized on the // mongos while the collection is unsharded. assert.commandWorked(mongos1.getCollection(collName).insert({[timeField]: ISODate()})); @@ -100,7 +102,9 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) { const queryField = `command.${Object.keys(cmdObj)[0]}`; let filter = {[queryField]: collName, "command.shardVersion.0": {$ne: Timestamp(0, 0)}}; - if (unVersioned) { + if (isDelete) { + filter = {"op": "remove", "ns": `${dbName}.${collName}`, "ok": {$ne: 0}}; + } else if (unVersioned) { filter["command.shardVersion.0"] = Timestamp(0, 0); } @@ -110,14 +114,20 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) { numEntries, {shard0Entries: shard0Entries, shard1Entries: shard1Entries}); } - validateCommand(bucketsCollName, numProfilerEntries.sharded); + + let targetShardedCollection = bucketsCollName; + if (isDelete && cmdObj["delete"] !== bucketsCollName) { + targetShardedCollection = collName; + } + validateCommand(targetShardedCollection, numProfilerEntries.sharded); // Insert dummy data so that the 'mongos1' sees the collection as sharded. assert.commandWorked(mongos1.getCollection(collName).insert({[timeField]: ISODate()})); // Drop and recreate an unsharded collection with 'mongos0' as the router. assert(mongos0.getCollection(collName).drop()); - assert.commandWorked(mongos0.createCollection(collName, {timeseries: {timeField: timeField}})); + assert.commandWorked(mongos0.createCollection( + collName, {timeseries: {timeField: timeField, metaField: metaField}})); // When unsharded, the command should be run against the user requested namespace. validateCommand(cmdObj[Object.keys(cmdObj)[0]] /* coll name specified in the command */, @@ -236,7 +246,58 @@ runTest({ numProfilerEntries: {sharded: 1, unsharded: 1}, }); -// TODO SERVER-59180: Add tests for updates. -// TODO SERVER-59181: Add tests for deletes. +if (TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(st.shard0) && + TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(st.shard0)) { + runTest({ + shardKey: {[metaField]: 1}, + cmdObj: { + delete: collName, + deletes: [{ + q: {}, + limit: 0, + }], + }, + numProfilerEntries: {sharded: 2, unsharded: 1}, + }); + + runTest({ + shardKey: {[metaField]: 1}, + cmdObj: { + delete: collName, + deletes: [{ + q: {[metaField]: 0}, + limit: 0, + }], + }, + numProfilerEntries: {sharded: 1, unsharded: 1}, + }); + + runTest({ + shardKey: {[metaField]: 1}, + cmdObj: { + delete: bucketsCollName, + deletes: [{ + q: {}, + limit: 0, + }], + }, + numProfilerEntries: {sharded: 2, unsharded: 1}, + }); + + runTest({ + shardKey: {[metaField]: 1}, + cmdObj: { + delete: bucketsCollName, + deletes: [{ + q: {meta: 0}, + limit: 0, + }], + }, + numProfilerEntries: {sharded: 1, unsharded: 1}, + }); + + // TODO SERVER-59180: Add tests for updates. +} + st.stop(); })(); diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index e6f1f5219fd..adce5e0ac34 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -1180,16 +1180,21 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx, WriteResult performDeletes(OperationContext* opCtx, const write_ops::DeleteCommandRequest& wholeOp, OperationSource source) { + auto ns = wholeOp.getNamespace(); + if (source == OperationSource::kTimeseriesDelete && !ns.isTimeseriesBucketsCollection()) { + ns = ns.makeTimeseriesBucketsNamespace(); + } + // Delete performs its own retries, so we should not be in a WriteUnitOfWork unless we are in a // transaction. auto txnParticipant = TransactionParticipant::get(opCtx); invariant(!opCtx->lockState()->inAWriteUnitOfWork() || (txnParticipant && opCtx->inMultiDocumentTransaction())); - uassertStatusOK(userAllowedWriteNS(opCtx, wholeOp.getNamespace())); + uassertStatusOK(userAllowedWriteNS(opCtx, ns)); DisableDocumentSchemaValidationIfTrue docSchemaValidationDisabler( opCtx, wholeOp.getWriteCommandRequestBase().getBypassDocumentValidation()); - LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace()); + LastOpFixer lastOpFixer(opCtx, ns); bool containsRetry = false; ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); }); @@ -1237,24 +1242,12 @@ WriteResult performDeletes(OperationContext* opCtx, }); try { lastOpFixer.startingOp(); - out.results.push_back( - performSingleDeleteOp(opCtx, - source == OperationSource::kTimeseriesDelete - ? wholeOp.getNamespace().makeTimeseriesBucketsNamespace() - : wholeOp.getNamespace(), - stmtId, - singleOp, - runtimeConstants, - wholeOp.getLet(), - source)); + out.results.push_back(performSingleDeleteOp( + opCtx, ns, stmtId, singleOp, runtimeConstants, wholeOp.getLet(), source)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { - out.canContinue = handleError(opCtx, - ex, - wholeOp.getNamespace(), - wholeOp.getWriteCommandRequestBase(), - false /* multiUpdate */, - &out); + out.canContinue = handleError( + opCtx, ex, ns, wholeOp.getWriteCommandRequestBase(), false /* multiUpdate */, &out); if (!out.canContinue) break; } diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index b078f91ba3b..b9b22e5851c 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -35,6 +35,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/not_primary_error_tracker', + '$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util', '$BUILD_DIR/mongo/db/timeseries/timeseries_options', 'query/cluster_query', 'write_ops/cluster_write_ops', diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index f170e835034..0e61716d42d 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -57,6 +57,8 @@ #include "mongo/util/str.h" #include "signal.h" +#include "mongo/db/timeseries/timeseries_update_delete_util.h" + namespace mongo { namespace { @@ -480,13 +482,39 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* itemRef.getLet(), itemRef.getLegacyRuntimeConstants()); + BSONObj deleteQuery = deleteOp.getQ(); BSONObj shardKey; if (_cm.isSharded()) { + if (_isRequestOnTimeseriesViewNamespace) { + uassert(ErrorCodes::NotImplemented, + "Deletes on sharded time-series collections feature is not enabled", + feature_flags::gFeatureFlagShardedTimeSeriesUpdateDelete.isEnabled( + serverGlobalParams.featureCompatibility)); + + uassert(ErrorCodes::IllegalOperation, + "Cannot perform a non-multi delete on a time-series collection", + deleteOp.getMulti()); + + auto tsFields = _cm.getTimeseriesFields(); + tassert(5918101, "Missing timeseriesFields on buckets collection", tsFields); + + const auto& metaField = tsFields->getMetaField(); + if (metaField) { + // Translate delete query into the query to the time-series buckets collection. + deleteQuery = timeseries::translateQuery(deleteQuery, *metaField); + } else { + // In case the time-series collection does not have meta field defined, we target + // the request to all shards using empty predicate. Since we allow only delete + // requests with 'limit:0', we will not delete any extra documents. + deleteQuery = BSONObj(); + } + } + // Sharded collections have the following further requirements for targeting: // // Limit-1 deletes must be targeted exactly by shard key *or* exact _id - shardKey = uassertStatusOK( - _cm.getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteOp.getQ())); + shardKey = + uassertStatusOK(_cm.getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteQuery)); } // Target the shard key or delete query @@ -501,7 +529,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* // Parse delete query. auto findCommand = std::make_unique<FindCommandRequest>(_nss); - findCommand->setFilter(deleteOp.getQ()); + findCommand->setFilter(deleteQuery); if (!collation.isEmpty()) { findCommand->setCollation(collation); } @@ -512,7 +540,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* expCtx, ExtensionsCallbackNoop(), MatchExpressionParser::kAllowAllSpecialFeatures), - str::stream() << "Could not parse delete query " << deleteOp.getQ()); + str::stream() << "Could not parse delete query " << deleteQuery); // Single deletes must target a single shard or be exact-ID. uassert(ErrorCodes::ShardKeyNotFound, @@ -523,7 +551,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* << ", shard key pattern: " << _cm.getShardKeyPattern().toString(), !_cm.isSharded() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, _cm)); - return uassertStatusOK(_targetQuery(expCtx, deleteOp.getQ(), collation)); + return uassertStatusOK(_targetQuery(expCtx, deleteQuery, collation)); } StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( |