diff options
author | Yoonsoo Kim <yoonsoo.kim@mongodb.com> | 2023-05-03 00:39:16 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-03 01:32:23 +0000 |
commit | f0e0b1452ca4395991199ca0d720f97b9f7dbbc8 (patch) | |
tree | 8b7197dd6fcb2892d0eda0ec420f14da2b3eb738 | |
parent | 8037d49bdffb50394fbcb5d6af624d02022eb656 (diff) | |
download | mongo-f0e0b1452ca4395991199ca0d720f97b9f7dbbc8.tar.gz |
SERVER-73083 Support findAndModify with remove: true on a timeseries collection
-rw-r--r-- | jstests/core/timeseries/libs/timeseries_writes_util.js | 258 | ||||
-rw-r--r-- | jstests/core/timeseries/timeseries_delete_one.js | 81 | ||||
-rw-r--r-- | jstests/core/timeseries/timeseries_find_and_modify_remove.js | 227 | ||||
-rw-r--r-- | src/mongo/db/commands/find_and_modify.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/exec/timeseries_modify.cpp | 79 | ||||
-rw-r--r-- | src/mongo/db/exec/timeseries_modify.h | 19 | ||||
-rw-r--r-- | src/mongo/db/ops/parsed_delete.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor_impl.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/timeseries/timeseries_update_delete_util.h | 61 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp | 2 |
12 files changed, 682 insertions, 165 deletions
diff --git a/jstests/core/timeseries/libs/timeseries_writes_util.js b/jstests/core/timeseries/libs/timeseries_writes_util.js new file mode 100644 index 00000000000..a05ad6fa379 --- /dev/null +++ b/jstests/core/timeseries/libs/timeseries_writes_util.js @@ -0,0 +1,258 @@ +/** + * Helpers for testing timeseries arbitrary writes. + */ + +load("jstests/libs/analyze_plan.js"); // For getPlanStage() and getExecutionStages(). +load("jstests/libs/fixture_helpers.js"); // For 'isMongos' + +const timeFieldName = "time"; +const metaFieldName = "tag"; +const dateTime = ISODate("2021-07-12T16:00:00Z"); +const collNamePrefix = "coll_"; +const closedBucketFilter = { + "control.closed": {$not: {$eq: true}} +}; + +let testCaseId = 0; +let testDB = null; + +/** + * Composes and returns a bucket-level filter for timeseries arbitrary writes. + * + * The bucket-level filter is composed of the closed bucket filter and the given filter(s) which + * are ANDed together. The closed bucket filter is always the first element of the AND array. + * Zero or more filters can be passed in as arguments. + */ +function makeBucketFilter(...args) { + return {$and: [closedBucketFilter].concat(Array.from(args))}; +} + +function getTestDB() { + if (!testDB) { + testDB = db.getSiblingDB(jsTestName()); + assert.commandWorked(testDB.dropDatabase()); + } + return testDB; +} + +function prepareCollection(initialDocList) { + const testDB = getTestDB(); + const coll = testDB.getCollection(collNamePrefix + testCaseId++); + coll.drop(); + assert.commandWorked(testDB.createCollection( + coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}})); + assert.commandWorked(coll.insert(initialDocList)); + + return coll; +} + +function verifyResultDocs(coll, initialDocList, expectedResultDocs, nDeleted) { + let resultDocs = coll.find().toArray(); + assert.eq(resultDocs.length, initialDocList.length - nDeleted, tojson(resultDocs)); + + // Validate the collection's exact contents if we were given the expected results. We may skip + // this step in some cases, if the delete doesn't pinpoint a specific document. + if (expectedResultDocs) { + assert.eq(expectedResultDocs.length, resultDocs.length, resultDocs); + expectedResultDocs.forEach(expectedDoc => { + assert.docEq( + expectedDoc, + coll.findOne({_id: expectedDoc._id}), + `Expected document (_id = ${expectedDoc._id}) not found in result collection: ${ + tojson(resultDocs)}`); + }); + } +} + +function verifyExplain( + {explain, rootStageName, bucketFilter, residualFilter, nBucketsUnpacked, nReturned}) { + if (!rootStageName) { + rootStageName = "TS_MODIFY"; + } else { + assert.eq("PROJECTION_DEFAULT", rootStageName, "Only PROJECTION_DEFAULT is allowed"); + } + + let foundStage = getPlanStage(explain.queryPlanner.winningPlan, rootStageName); + assert.neq(null, + foundStage, + `The root ${rootStageName} stage not found in the plan: ${tojson(explain)}`); + if (rootStageName === "PROJECTION_DEFAULT") { + assert.eq("TS_MODIFY", + foundStage.inputStage.stage, + `TS_MODIFY is not a child of ${rootStageName} in the plan: ${tojson(explain)}`); + foundStage = foundStage.inputStage; + } + + assert.eq("deleteOne", foundStage.opType, `TS_MODIFY opType is wrong: ${tojson(foundStage)}`); + assert.eq(bucketFilter, + foundStage.bucketFilter, + `TS_MODIFY bucketFilter is wrong: ${tojson(foundStage)}`); + assert.eq(residualFilter, + foundStage.residualFilter, + `TS_MODIFY residualFilter is wrong: ${tojson(foundStage)}`); + + const execStages = getExecutionStages(explain); + assert.eq(rootStageName, execStages[0].stage, `The root stage is wrong: ${tojson(execStages)}`); + let tsModifyStage = execStages[0]; + if (tsModifyStage.stage === "PROJECTION_DEFAULT") { + tsModifyStage = tsModifyStage.inputStage; + } + assert.eq( + "TS_MODIFY", tsModifyStage.stage, `Can't find TS_MODIFY stage: ${tojson(execStages)}`); + assert.eq(nBucketsUnpacked, + tsModifyStage.nBucketsUnpacked, + `Got wrong nBucketsUnpacked ${tojson(tsModifyStage)}`); + assert.eq(nReturned, tsModifyStage.nReturned, `Got wrong nReturned ${tojson(tsModifyStage)}`); +} + +/** + * Confirms that a deleteOne returns the expected set of documents. + * + * - initialDocList: The initial documents in the collection. + * - filter: The filter for the deleteOne command. + * - expectedResultDocs: The expected documents in the collection after the delete. + * - nDeleted: The expected number of documents deleted. + */ +function testDeleteOne({initialDocList, filter, expectedResultDocs, nDeleted}) { + const coll = prepareCollection(initialDocList); + + const res = assert.commandWorked(coll.deleteOne(filter)); + assert.eq(nDeleted, res.deletedCount); + + verifyResultDocs(coll, initialDocList, expectedResultDocs, nDeleted); +} + +/** + * Confirms that a findAndModify with remove: true returns the expected result(s) 'res'. + * + * - initialDocList: The initial documents in the collection. + * - cmd.filter: The filter for the findAndModify command. + * - cmd.fields: The projection for the findAndModify command. + * - cmd.sort: The sort option for the findAndModify command. + * - cmd.collation: The collation option for the findAndModify command. + * - res.errorCode: If errorCode is set, we expect the command to fail with that code and other + * fields of 'res' and 'explain' are ignored. + * - res.expectedResultDocs: The expected documents in the collection after the delete. + * - res.nDeleted: The expected number of documents deleted. + * - res.deletedDoc: The expected document returned by the findAndModify command. + * - res.rootStage: The expected root stage of the explain plan. + * - res.bucketFilter: The expected bucket filter of the TS_MODIFY stage. + * - res.residualFilter: The expected residual filter of the TS_MODIFY stage. + * - res.nBucketsUnpacked: The expected number of buckets unpacked by the TS_MODIFY stage. + * - res.nReturned: The expected number of documents returned by the TS_MODIFY stage. + */ +function testFindOneAndRemove({ + initialDocList, + cmd: {filter, fields, sort, collation}, + res: { + errorCode, + expectedResultDocs, + nDeleted, + deletedDoc, + rootStage, + bucketFilter, + residualFilter, + nBucketsUnpacked, + nReturned, + }, +}) { + const coll = prepareCollection(initialDocList); + + const session = coll.getDB().getSession(); + const shouldRetryWrites = session.getOptions().shouldRetryWrites(); + const findAndModifyCmd = { + findAndModify: coll.getName(), + query: filter, + fields: fields, + sort: sort, + collation: collation, + remove: true + }; + // TODO SERVER-76583: Remove this check and always verify the result or verify the 'errorCode'. + if (!shouldRetryWrites && !errorCode) { + const explainRes = assert.commandWorked( + coll.runCommand({explain: findAndModifyCmd, verbosity: "executionStats"})); + if (bucketFilter) { + verifyExplain({ + explain: explainRes, + rootStageName: rootStage, + bucketFilter: bucketFilter, + residualFilter: residualFilter, + nBucketsUnpacked: nBucketsUnpacked, + nReturned: nReturned, + }); + } + + const res = assert.commandWorked(testDB.runCommand(findAndModifyCmd)); + assert.eq(nDeleted, res.lastErrorObject.n, tojson(res)); + if (deletedDoc) { + assert.docEq(deletedDoc, res.value, tojson(res)); + } else if (nDeleted === 1) { + assert.neq(null, res.value, tojson(res)); + } else if (nDeleted === 0) { + assert.eq(null, res.value, tojson(res)); + } + + verifyResultDocs(coll, initialDocList, expectedResultDocs, nDeleted); + } else if (errorCode) { + assert.commandFailedWithCode(testDB.runCommand(findAndModifyCmd), errorCode); + } else { + // TODO SERVER-76583: Remove this test. + assert.commandFailedWithCode(testDB.runCommand(findAndModifyCmd), 7308305); + } +} + +// Defines sample data set for testing. +const doc1_a_nofields = { + _id: 1, + [timeFieldName]: dateTime, + [metaFieldName]: "A", +}; +const doc2_a_f101 = { + _id: 2, + [timeFieldName]: dateTime, + [metaFieldName]: "A", + f: 101 +}; +const doc3_a_f102 = { + _id: 3, + [timeFieldName]: dateTime, + [metaFieldName]: "A", + f: 102 +}; +const doc4_b_f103 = { + _id: 4, + [timeFieldName]: dateTime, + [metaFieldName]: "B", + f: 103 +}; +const doc5_b_f104 = { + _id: 5, + [timeFieldName]: dateTime, + [metaFieldName]: "B", + f: 104 +}; +const doc6_c_f105 = { + _id: 6, + [timeFieldName]: dateTime, + [metaFieldName]: "C", + f: 105 +}; +const doc7_c_f106 = { + _id: 7, + [timeFieldName]: dateTime, + [metaFieldName]: "C", + f: 106, +}; + +function getSampleDataForWrites() { + return [ + doc1_a_nofields, + doc2_a_f101, + doc3_a_f102, + doc4_b_f103, + doc5_b_f104, + doc6_c_f105, + doc7_c_f106, + ]; +} diff --git a/jstests/core/timeseries/timeseries_delete_one.js b/jstests/core/timeseries/timeseries_delete_one.js index 39ad152a7d2..315f611e126 100644 --- a/jstests/core/timeseries/timeseries_delete_one.js +++ b/jstests/core/timeseries/timeseries_delete_one.js @@ -11,86 +11,7 @@ (function() { "use strict"; -const timeFieldName = "time"; -const metaFieldName = "tag"; -const dateTime = ISODate("2021-07-12T16:00:00Z"); -const collNamePrefix = "timeseries_delete_one_"; -let testCaseId = 0; - -const testDB = db.getSiblingDB(jsTestName()); -assert.commandWorked(testDB.dropDatabase()); - -/** - * Confirms that a deleteOne() returns the expected set of documents. - */ -function testDeleteOne({initialDocList, filter, expectedResultDocs, nDeleted}) { - const coll = testDB.getCollection(collNamePrefix + testCaseId++); - assert.commandWorked(testDB.createCollection( - coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}})); - - assert.commandWorked(coll.insert(initialDocList)); - - const res = assert.commandWorked(coll.deleteOne(filter)); - assert.eq(nDeleted, res.deletedCount); - - const resultDocs = coll.find().toArray(); - assert.eq(resultDocs.length, initialDocList.length - nDeleted, tojson(resultDocs)); - - // Validate the collection's exact contents if we were given the expected results. We may skip - // this step in some cases, if the delete doesn't pinpoint a specific document. - if (expectedResultDocs) { - assert.eq(expectedResultDocs.length, resultDocs.length, resultDocs); - expectedResultDocs.forEach(expectedDoc => { - assert.docEq( - expectedDoc, - coll.findOne({_id: expectedDoc._id}), - `Expected document (_id = ${expectedDoc._id}) not found in result collection: ${ - tojson(resultDocs)}`); - }); - } -} - -const doc1_a_nofields = { - _id: 1, - [timeFieldName]: dateTime, - [metaFieldName]: "A", -}; -const doc2_a_f101 = { - _id: 2, - [timeFieldName]: dateTime, - [metaFieldName]: "A", - f: 101 -}; -const doc3_a_f102 = { - _id: 3, - [timeFieldName]: dateTime, - [metaFieldName]: "A", - f: 102 -}; -const doc4_b_f103 = { - _id: 4, - [timeFieldName]: dateTime, - [metaFieldName]: "B", - f: 103 -}; -const doc5_b_f104 = { - _id: 5, - [timeFieldName]: dateTime, - [metaFieldName]: "B", - f: 104 -}; -const doc6_c_f105 = { - _id: 6, - [timeFieldName]: dateTime, - [metaFieldName]: "C", - f: 105 -}; -const doc7_c_f106 = { - _id: 7, - [timeFieldName]: dateTime, - [metaFieldName]: "C", - f: 106, -}; +load("jstests/core/timeseries/libs/timeseries_writes_util.js"); // Query on the 'f' field leads to zero measurement delete. (function testZeroMeasurementDelete() { diff --git a/jstests/core/timeseries/timeseries_find_and_modify_remove.js b/jstests/core/timeseries/timeseries_find_and_modify_remove.js new file mode 100644 index 00000000000..53a024f2519 --- /dev/null +++ b/jstests/core/timeseries/timeseries_find_and_modify_remove.js @@ -0,0 +1,227 @@ +/** + * Tests findAndModify with remove: true on a timeseries collection. + * + * @tags: [ + * # We need a timeseries collection. + * requires_timeseries, + * # findAndModify with remove: true on a timeseries collection is supported since 7.1 + * requires_fcv_71, + * # TODO SERVER-76583: Remove following two tags. + * does_not_support_retryable_writes, + * requires_non_retryable_writes, + * # TODO SERVER-76530: Remove the follow tag. + * assumes_unsharded_collection, + * ] + */ + +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries_writes_util.js"); + +// findAndModify with a sort option is not supported. +(function testSortOptionFails() { + jsTestLog("Running testSortOptionFails()"); + testFindOneAndRemove({ + initialDocList: [doc1_a_nofields, doc4_b_f103, doc6_c_f105], + cmd: {filter: {f: {$gt: 100}}, sort: {f: 1}}, + res: {errorCode: ErrorCodes.InvalidOptions}, + }); +})(); + +// Query on the 'f' field leads to zero measurement delete. +(function testZeroMeasurementDelete() { + jsTestLog("Running testZeroMeasurementDelete()"); + testFindOneAndRemove({ + initialDocList: [doc1_a_nofields, doc4_b_f103, doc6_c_f105], + cmd: {filter: {f: 17}}, + res: { + expectedDocList: [doc1_a_nofields, doc4_b_f103, doc6_c_f105], + nDeleted: 0, + bucketFilter: makeBucketFilter({ + $and: [ + {"control.min.f": {$_internalExprLte: 17}}, + {"control.max.f": {$_internalExprGte: 17}}, + ] + }), + residualFilter: {f: {$eq: 17}}, + nBucketsUnpacked: 0, + nReturned: 0, + }, + }); +})(); + +// Query on the 'f' field leads to a partial bucket delete. +(function testPartialBucketDelete() { + jsTestLog("Running testPartialBucketDelete()"); + testFindOneAndRemove({ + initialDocList: [doc1_a_nofields, doc2_a_f101, doc3_a_f102], + cmd: {filter: {f: 101}}, + res: + {expectedDocList: [doc1_a_nofields, doc3_a_f102], nDeleted: 1, deletedDoc: doc2_a_f101}, + }); +})(); + +// Query on the 'f' field leads to a partial bucket delete and 'fields' project the returned doc. +(function testPartialBucketDeleteWithFields() { + jsTestLog("Running testPartialBucketDeleteWithFields()"); + testFindOneAndRemove({ + initialDocList: [doc1_a_nofields, doc2_a_f101, doc3_a_f102], + cmd: {filter: {f: 102}, fields: {f: 1, [metaFieldName]: 1, _id: 0}}, + res: { + expectedDocList: [doc1_a_nofields, doc2_a_f101], + nDeleted: 1, + deletedDoc: {f: 102, [metaFieldName]: "A"}, + rootStage: "PROJECTION_DEFAULT", + bucketFilter: makeBucketFilter({ + $and: [ + {"control.min.f": {$_internalExprLte: 102}}, + {"control.max.f": {$_internalExprGte: 102}}, + ] + }), + residualFilter: {f: {$eq: 102}}, + nBucketsUnpacked: 1, + nReturned: 1, + }, + }); +})(); + +// Query on the 'f' field leads to a full (single document) bucket delete. +(function testFullBucketDelete() { + jsTestLog("Running testFullBucketDelete()"); + testFindOneAndRemove({ + initialDocList: [doc2_a_f101], + cmd: {filter: {f: 101}}, + res: { + expectedDocList: [], + nDeleted: 1, + deletedDoc: doc2_a_f101, + bucketFilter: makeBucketFilter({ + $and: [ + {"control.min.f": {$_internalExprLte: 101}}, + {"control.max.f": {$_internalExprGte: 101}}, + ] + }), + residualFilter: {f: {$eq: 101}}, + nBucketsUnpacked: 1, + nReturned: 1, + }, + }); +})(); + +// Query on the 'tag' field matches all docs and deletes one. +(function testMatchFullBucketOnlyDeletesOne() { + jsTestLog("Running testMatchFullBucketOnlyDeletesOne()"); + testFindOneAndRemove({ + initialDocList: [doc1_a_nofields, doc2_a_f101, doc3_a_f102], + cmd: {filter: {[metaFieldName]: "A"}}, + // Don't validate exact results as we could delete any doc. + res: { + nDeleted: 1, + bucketFilter: makeBucketFilter({meta: {$eq: "A"}}), + residualFilter: {}, + nBucketsUnpacked: 1, + nReturned: 1, + }, + }); +})(); + +// Query on the 'tag' and metric field. +(function testMetaAndMetricFilterOnlyDeletesOne() { + jsTestLog("Running testMetaAndMetricFilterOnlyDeletesOne()"); + testFindOneAndRemove({ + initialDocList: [doc1_a_nofields, doc2_a_f101, doc3_a_f102], + cmd: {filter: {[metaFieldName]: "A", f: {$gt: 101}}}, + res: { + nDeleted: 1, + deletedDoc: doc3_a_f102, + bucketFilter: + makeBucketFilter({meta: {$eq: "A"}}, {"control.max.f": {$_internalExprGt: 101}}), + residualFilter: {f: {$gt: 101}}, + nBucketsUnpacked: 1, + nReturned: 1, + } + }); +})(); + +// Query on the 'f' field matches docs in multiple buckets but only deletes from one. +(function testMatchMultiBucketOnlyDeletesOne() { + jsTestLog("Running testMatchMultiBucketOnlyDeletesOne()"); + testFindOneAndRemove({ + initialDocList: [ + doc1_a_nofields, + doc2_a_f101, + doc3_a_f102, + doc4_b_f103, + doc5_b_f104, + doc6_c_f105, + doc7_c_f106 + ], + cmd: {filter: {f: {$gt: 101}}}, + // Don't validate exact results as we could delete one of a few docs. + res: { + nDeleted: 1, + bucketFilter: makeBucketFilter({"control.max.f": {$_internalExprGt: 101}}), + residualFilter: {f: {$gt: 101}}, + nBucketsUnpacked: 1, + nReturned: 1, + }, + }); +})(); + +// Empty filter matches all docs but only deletes one. +(function testEmptyFilterOnlyDeletesOne() { + jsTestLog("Running testEmptyFilterOnlyDeletesOne()"); + testFindOneAndRemove({ + initialDocList: [ + doc1_a_nofields, + doc2_a_f101, + doc3_a_f102, + doc4_b_f103, + doc5_b_f104, + doc6_c_f105, + doc7_c_f106 + ], + cmd: {filter: {}}, + // Don't validate exact results as we could delete any doc. + res: { + nDeleted: 1, + bucketFilter: makeBucketFilter({}), + residualFilter: {}, + nBucketsUnpacked: 1, + nReturned: 1 + }, + }); +})(); + +// Verifies that the collation is properly propagated to the bucket-level filter when the +// query-level collation overrides the collection default collation. +(function testFindAndRemoveWithCollation() { + jsTestLog("Running testFindAndRemoveWithCollation()"); + testFindOneAndRemove({ + initialDocList: [ + doc1_a_nofields, + doc2_a_f101, + doc3_a_f102, + doc4_b_f103, + doc5_b_f104, + doc6_c_f105, + doc7_c_f106 + ], + cmd: { + filter: {[metaFieldName]: "a", f: {$gt: 101}}, + /*caseInsensitive collation*/ + collation: {locale: "en", strength: 2} + }, + res: { + nDeleted: 1, + deletedDoc: doc3_a_f102, + bucketFilter: + makeBucketFilter({meta: {$eq: "a"}}, {"control.max.f": {$_internalExprGt: 101}}), + residualFilter: {f: {$gt: 101}}, + nBucketsUnpacked: 1, + nReturned: 1, + }, + }); +})(); +})(); diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 64cee94e7f4..581cb065ba1 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -64,6 +64,7 @@ #include "mongo/db/stats/resource_consumption_metrics.h" #include "mongo/db/stats/top.h" #include "mongo/db/storage/duplicate_key_error_info.h" +#include "mongo/db/timeseries/timeseries_update_delete_util.h" #include "mongo/db/transaction/retryable_writes_stats.h" #include "mongo/db/transaction/transaction_participant.h" #include "mongo/db/transaction_validation.h" @@ -349,7 +350,8 @@ void CmdFindAndModify::Invocation::explain(OperationContext* opCtx, }(); auto request = requestAndMsg.first; - const NamespaceString& nss = request.getNamespace(); + auto [isTimeseries, nss] = timeseries::isTimeseries(opCtx, request); + uassertStatusOK(userAllowedWriteNS(opCtx, nss)); auto const curOp = CurOp::get(opCtx); OpDebug* const opDebug = &curOp->debug(); @@ -372,7 +374,8 @@ void CmdFindAndModify::Invocation::explain(OperationContext* opCtx, str::stream() << "database " << dbName.toStringForErrorMsg() << " does not exist", DatabaseHolder::get(opCtx)->getDb(opCtx, nss.dbName())); - ParsedDelete parsedDelete(opCtx, &deleteRequest, collection.getCollectionPtr()); + ParsedDelete parsedDelete( + opCtx, &deleteRequest, collection.getCollectionPtr(), isTimeseries); uassertStatusOK(parsedDelete.parseRequest()); CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 27ea2465486..378049c26ee 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -70,6 +70,7 @@ #include "mongo/db/storage/duplicate_key_error_info.h" #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" +#include "mongo/db/timeseries/timeseries_update_delete_util.h" #include "mongo/db/transaction/retryable_writes_stats.h" #include "mongo/db/transaction/transaction_participant.h" #include "mongo/db/transaction_validation.h" @@ -113,33 +114,6 @@ bool shouldSkipOutput(OperationContext* opCtx) { } /** - * Returns true if 'ns' is a time-series collection. That is, this namespace is backed by a - * time-series buckets collection. - */ -template <class Request> -bool isTimeseries(OperationContext* opCtx, const Request& request) { - uassert(5916400, - "'isTimeseriesNamespace' parameter can only be set when the request is sent on " - "system.buckets namespace", - !request.getIsTimeseriesNamespace() || - request.getNamespace().isTimeseriesBucketsCollection()); - const auto bucketNss = request.getIsTimeseriesNamespace() - ? request.getNamespace() - : request.getNamespace().makeTimeseriesBucketsNamespace(); - - // If the buckets collection exists now, the time-series insert path will check for the - // existence of the buckets collection later on with a lock. - // If this check is concurrent with the creation of a time-series collection and the buckets - // collection does not yet exist, this check may return false unnecessarily. As a result, an - // insert attempt into the time-series namespace will either succeed or fail, depending on who - // wins the race. - // Hold reference to the catalog for collection lookup without locks to be safe. - auto catalog = CollectionCatalog::get(opCtx); - auto coll = catalog->lookupCollectionByNamespace(opCtx, bucketNss); - return (coll && coll->getTimeseriesOptions()); -} - -/** * Contains hooks that are used by 'populateReply' method. */ struct PopulateReplyHooks { @@ -298,7 +272,7 @@ public: } } - if (isTimeseries(opCtx, request())) { + if (auto [isTimeseries, _] = timeseries::isTimeseries(opCtx, request()); isTimeseries) { // Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's // constructor. try { @@ -467,7 +441,7 @@ public: } } - if (isTimeseries(opCtx, request())) { + if (auto [isTimeseries, _] = timeseries::isTimeseries(opCtx, request()); isTimeseries) { uassert(ErrorCodes::OperationNotSupportedInTransaction, str::stream() << "Cannot perform a multi-document transaction on a " "time-series collection: " @@ -558,15 +532,7 @@ public: "explained write batches must be of size 1", request().getUpdates().size() == 1); - auto isRequestToTimeseries = isTimeseries(opCtx, request()); - auto nss = [&] { - auto nss = request().getNamespace(); - if (!isRequestToTimeseries) { - return nss; - } - return nss.isTimeseriesBucketsCollection() ? nss - : nss.makeTimeseriesBucketsNamespace(); - }(); + auto [isRequestToTimeseries, nss] = timeseries::isTimeseries(opCtx, request()); UpdateRequest updateRequest(request().getUpdates()[0]); updateRequest.setNamespaceString(nss); @@ -712,7 +678,7 @@ public: } } - if (isTimeseries(opCtx, request())) { + if (auto [isTimeseries, _] = timeseries::isTimeseries(opCtx, request()); isTimeseries) { source = OperationSource::kTimeseriesDelete; } @@ -756,15 +722,7 @@ public: request().getDeletes().size() == 1); auto deleteRequest = DeleteRequest{}; - auto isRequestToTimeseries = isTimeseries(opCtx, request()); - auto nss = [&] { - auto nss = request().getNamespace(); - if (!isRequestToTimeseries) { - return nss; - } - return nss.isTimeseriesBucketsCollection() ? nss - : nss.makeTimeseriesBucketsNamespace(); - }(); + auto [isRequestToTimeseries, nss] = timeseries::isTimeseries(opCtx, request()); deleteRequest.setNsString(nss); deleteRequest.setLegacyRuntimeConstants(request().getLegacyRuntimeConstants().value_or( Variables::generateRuntimeConstants(opCtx))); diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp index 0aa5fb24535..f34c167ce33 100644 --- a/src/mongo/db/exec/timeseries_modify.cpp +++ b/src/mongo/db/exec/timeseries_modify.cpp @@ -57,6 +57,9 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx, tassert(7308200, "Multi deletes must have a residual predicate", _isSingletonWrite() || _residualPredicate || _params.isUpdate); + tassert(7308300, + "Can return the deleted measurement only if deleting one", + !_params.returnDeleted || _isSingletonWrite()); _children.emplace_back(std::move(child)); // These three properties are only used for the queryPlanner explain and will not change while @@ -86,7 +89,9 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx, bool TimeseriesModifyStage::isEOF() { if (_isSingletonWrite() && _specificStats.nMeasurementsModified > 0) { - return true; + // If we have a measurement to return, we should not return EOF so that we can get a chance + // to get called again and return the measurement. + return !_deletedMeasurementToReturn; } return child()->isEOF() && _retryBucketId == WorkingSet::INVALID_ID; } @@ -101,20 +106,24 @@ std::unique_ptr<PlanStageStats> TimeseriesModifyStage::getStats() { return ret; } -PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( +template <typename F> +std::pair<bool, PlanStage::StageState> TimeseriesModifyStage::_writeToTimeseriesBuckets( + ScopeGuard<F>& bucketFreer, WorkingSetID bucketWsmId, const std::vector<BSONObj>& unchangedMeasurements, const std::vector<BSONObj>& modifiedMeasurements, bool bucketFromMigrate) { + // No measurements needed to be deleted from the bucket document. + if (modifiedMeasurements.empty()) { + return {false, PlanStage::NEED_TIME}; + } + + // We don't actually write anything if we are in explain mode but we still need to update the + // stats and let the caller think as if the write succeeded if there's any deleted measurement. if (_params.isExplain) { _specificStats.nMeasurementsModified += modifiedMeasurements.size(); _specificStats.nMeasurementsMatched += modifiedMeasurements.size(); - return PlanStage::NEED_TIME; - } - - // No measurements needed to be updated or deleted from the bucket document. - if (modifiedMeasurements.empty()) { - return PlanStage::NEED_TIME; + return {true, PlanStage::NEED_TIME}; } handlePlanStageYield( @@ -138,6 +147,8 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( "Retrying bucket due to conflict attempting to write out changes", "bucket_rid"_attr = recordId, "status"_attr = status); + // We need to retry the bucket, so we should not free the current bucket. + bucketFreer.dismiss(); _retryBucket(bucketWsmId); return PlanStage::NEED_YIELD; }; @@ -150,7 +161,7 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( auto result = timeseries::performAtomicWrites( opCtx(), collection(), recordId, op, {}, bucketFromMigrate, _params.stmtId); if (!result.isOK()) { - return yieldAndRetry(7309300, result); + return {false, yieldAndRetry(7309300, result)}; } } else { auto timeseriesOptions = collection()->getTimeseriesOptions(); @@ -177,7 +188,7 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( auto result = timeseries::performAtomicWrites( opCtx(), collection(), recordId, op, {}, bucketFromMigrate, _params.stmtId); if (!result.isOK()) { - return yieldAndRetry(7309301, result); + return {false, yieldAndRetry(7309301, result)}; } } _specificStats.nMeasurementsMatched += modifiedMeasurements.size(); @@ -186,7 +197,7 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( // As restoreState may restore (recreate) cursors, cursors are tied to the transaction in which // they are created, and a WriteUnitOfWork is a transaction, make sure to restore the state // outside of the WriteUnitOfWork. - return handlePlanStageYield( + auto status = handlePlanStageYield( expCtx(), "TimeseriesModifyStage restoreState", collection()->ns().ns(), @@ -196,9 +207,12 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( }, // yieldHandler // Note we don't need to retry anything in this case since the write already was committed. - // However, we still need to return the affected document (if it was requested). - // TODO SERVER-73089 for findAndModify we need to return the affected doc. + // However, we still need to return the affected measurement (if it was requested). We don't + // need to rely on the storage engine to return the affected document since we already have + // it in memory. [&] { /* noop */ }); + + return {true, status}; } template <typename F> @@ -270,11 +284,30 @@ void TimeseriesModifyStage::_retryBucket(WorkingSetID bucketId) { _retryBucketId = bucketId; } +void TimeseriesModifyStage::_prepareToReturnDeletedMeasurement(WorkingSetID& out, + BSONObj measurement) { + out = _ws->allocate(); + auto member = _ws->get(out); + // The measurement does not have record id. + member->recordId = RecordId{}; + member->doc.value() = Document{std::move(measurement)}; + _ws->transitionToOwnedObj(out); +} + PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) { if (isEOF()) { return PlanStage::IS_EOF; } + if (_deletedMeasurementToReturn) { + // If we fall into this case, then we were asked to return the deleted measurement but we + // were not able to do so in the previous call to doWork() because we needed to yield. Now + // that we are back, we can return the deleted measurement. + _prepareToReturnDeletedMeasurement(*out, *_deletedMeasurementToReturn); + _deletedMeasurementToReturn.reset(); + return PlanStage::ADVANCED; + } + tassert(7495500, "Expected bucketUnpacker's current bucket to be exhausted", !_bucketUnpacker.hasNext()); @@ -329,11 +362,25 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) { } } - status = _writeToTimeseriesBuckets( - id, unchangedMeasurements, modifiedMeasurements, bucketFromMigrate); + auto isWriteSuccessful = false; + std::tie(isWriteSuccessful, status) = _writeToTimeseriesBuckets( + bucketFreer, id, unchangedMeasurements, modifiedMeasurements, bucketFromMigrate); if (status != PlanStage::NEED_TIME) { *out = WorkingSet::INVALID_ID; - bucketFreer.dismiss(); + if (_params.returnDeleted && isWriteSuccessful) { + // If asked to return the deleted measurement and the write was successful but we need + // to yield, we need to save the deleted measurement to return it later. See isEOF() for + // more info. + tassert(7308301, + "Can return only one deleted measurement", + modifiedMeasurements.size() == 1); + _deletedMeasurementToReturn = std::move(modifiedMeasurements[0]); + } + } else if (_params.returnDeleted && isWriteSuccessful) { + // If the write was successful and if asked to return the deleted measurement, we return it + // immediately. + _prepareToReturnDeletedMeasurement(*out, modifiedMeasurements[0]); + status = PlanStage::ADVANCED; } return status; } diff --git a/src/mongo/db/exec/timeseries_modify.h b/src/mongo/db/exec/timeseries_modify.h index 6bd3ffd8436..268a7884f7a 100644 --- a/src/mongo/db/exec/timeseries_modify.h +++ b/src/mongo/db/exec/timeseries_modify.h @@ -44,6 +44,7 @@ struct TimeseriesModifyParams { isMulti(deleteParams->isMulti), fromMigrate(deleteParams->fromMigrate), isExplain(deleteParams->isExplain), + returnDeleted(deleteParams->returnDeleted), stmtId(deleteParams->stmtId), canonicalQuery(deleteParams->canonicalQuery) {} @@ -74,6 +75,9 @@ struct TimeseriesModifyParams { // Are we explaining a command rather than actually executing it? bool isExplain; + // Should we return the deleted document? + bool returnDeleted = false; + // The stmtId for this particular command. StmtId stmtId = kUninitializedStmtId; @@ -142,8 +146,12 @@ private: /** * Writes the modifications to a bucket. + * + * Returns the pair of (whether the write was successful, the stage state to propagate). */ - PlanStage::StageState _writeToTimeseriesBuckets( + template <typename F> + std::pair<bool, PlanStage::StageState> _writeToTimeseriesBuckets( + ScopeGuard<F>& bucketFreer, WorkingSetID bucketWsmId, const std::vector<BSONObj>& unchangedMeasurements, const std::vector<BSONObj>& modifiedMeasurements, @@ -164,6 +172,11 @@ private: */ PlanStage::StageState _getNextBucket(WorkingSetID& id); + /** + * Prepares returning a deleted measurement. + */ + void _prepareToReturnDeletedMeasurement(WorkingSetID& out, BSONObj measurement); + TimeseriesModifyParams _params; WorkingSet* _ws; @@ -194,5 +207,9 @@ private: // A pending retry to get to after a NEED_YIELD propagation and a new storage snapshot is // established. This can be set when a write fails or when a fetch fails. WorkingSetID _retryBucketId = WorkingSet::INVALID_ID; + + // Stores the deleted document when a deleteOne with returnDeleted: true is requested and we + // need to yield. + boost::optional<BSONObj> _deletedMeasurementToReturn = boost::none; }; } // namespace mongo diff --git a/src/mongo/db/ops/parsed_delete.cpp b/src/mongo/db/ops/parsed_delete.cpp index 69c030a3a48..0f019189f77 100644 --- a/src/mongo/db/ops/parsed_delete.cpp +++ b/src/mongo/db/ops/parsed_delete.cpp @@ -135,7 +135,7 @@ Status ParsedDelete::parseQueryToCQ() { // TODO: Due to the complexity which is related to the efficient sort support, we don't // support yet findAndModify with a query and sort but it should not be impossible. // This code assumes that in findAndModify code path, the parsed delete constructor should - // be called with source == kTimeseriesDelete for a time-series collection. + // be called with isTimeseriesDelete = true for a time-series collection. uassert(ErrorCodes::InvalidOptions, "Cannot perform a findAndModify with a query and sort on a time-series collection.", !_timeseriesDeleteQueryExprs); diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 6e1eeb3c82a..54092885182 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -806,7 +806,7 @@ UpdateResult writeConflictRetryUpsert(OperationContext* opCtx, } long long writeConflictRetryRemove(OperationContext* opCtx, - const NamespaceString& nsString, + const NamespaceString& nss, DeleteRequest* deleteRequest, CurOp* curOp, OpDebug* opDebug, @@ -815,13 +815,25 @@ long long writeConflictRetryRemove(OperationContext* opCtx, invariant(deleteRequest); + auto [isTimeseriesDelete, nsString] = timeseries::isTimeseries(opCtx, *deleteRequest); + if (isTimeseriesDelete) { + // TODO SERVER-76583: Remove this check. + uassert(7308305, + "Retryable findAndModify on a timeseries is not supported", + !opCtx->isRetryableWrite()); + + if (nss != nsString) { + deleteRequest->setNsString(nsString); + } + } + const auto collection = acquireCollection( opCtx, CollectionAcquisitionRequest::fromOpCtx(opCtx, nsString, AcquisitionPrerequisites::kWrite), MODE_IX); const auto& collectionPtr = collection.getCollectionPtr(); - ParsedDelete parsedDelete(opCtx, deleteRequest, collectionPtr); + ParsedDelete parsedDelete(opCtx, deleteRequest, collectionPtr, isTimeseriesDelete); uassertStatusOK(parsedDelete.parseRequest()); { diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 082563180bc..83c475dfd50 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -637,27 +637,40 @@ long long PlanExecutorImpl::executeDelete() { } // If the collection exists, the delete plan may either have a delete stage at the root, or - // (for findAndModify) a projection stage wrapping a delete stage. - switch (_root->stageType()) { - case StageType::STAGE_PROJECTION_DEFAULT: - case StageType::STAGE_PROJECTION_COVERED: - case StageType::STAGE_PROJECTION_SIMPLE: { - invariant(_root->getChildren().size() == 1U); - invariant(StageType::STAGE_DELETE == _root->child()->stageType()); - const SpecificStats* stats = _root->child()->getSpecificStats(); - return static_cast<const DeleteStats*>(stats)->docsDeleted; + // (for findAndModify) a projection stage wrapping a delete / TS_MODIFY stage. + const auto deleteStage = [&] { + switch (_root->stageType()) { + case StageType::STAGE_PROJECTION_DEFAULT: + case StageType::STAGE_PROJECTION_COVERED: + case StageType::STAGE_PROJECTION_SIMPLE: { + tassert(7308302, + "Unexpected number of children: {}"_format(_root->getChildren().size()), + _root->getChildren().size() == 1U); + auto childStage = _root->child().get(); + tassert(7308303, + "Unexpected child stage type: {}"_format(childStage->stageType()), + StageType::STAGE_DELETE == childStage->stageType() || + StageType::STAGE_TIMESERIES_MODIFY == childStage->stageType()); + return childStage; + } + default: + return _root.get(); } + }(); + switch (deleteStage->stageType()) { case StageType::STAGE_TIMESERIES_MODIFY: { const auto* tsModifyStats = - static_cast<const TimeseriesModifyStats*>(_root->getSpecificStats()); + static_cast<const TimeseriesModifyStats*>(deleteStage->getSpecificStats()); return tsModifyStats->nMeasurementsModified; } - default: { - invariant(StageType::STAGE_DELETE == _root->stageType() || - StageType::STAGE_BATCHED_DELETE == _root->stageType()); - const auto* deleteStats = static_cast<const DeleteStats*>(_root->getSpecificStats()); + case StageType::STAGE_DELETE: + case StageType::STAGE_BATCHED_DELETE: { + const auto* deleteStats = + static_cast<const DeleteStats*>(deleteStage->getSpecificStats()); return deleteStats->docsDeleted; } + default: + MONGO_UNREACHABLE_TASSERT(7308306); } } diff --git a/src/mongo/db/timeseries/timeseries_update_delete_util.h b/src/mongo/db/timeseries/timeseries_update_delete_util.h index 5ddc630e223..734629681d7 100644 --- a/src/mongo/db/timeseries/timeseries_update_delete_util.h +++ b/src/mongo/db/timeseries/timeseries_update_delete_util.h @@ -32,6 +32,7 @@ #include <memory> #include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/ops/parsed_writes_common.h" #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/expression_context.h" @@ -77,4 +78,64 @@ TimeseriesWritesQueryExprs getMatchExprsForWrites( const boost::intrusive_ptr<ExpressionContext>& expCtx, const TimeseriesOptions& tsOptions, const BSONObj& writeQuery); + +// Type requirement 1 for isTimeseries() +template <typename T> +constexpr bool isRequestableWithTimeseriesBucketNamespace = + std::is_same_v<T, write_ops::InsertCommandRequest> || + std::is_same_v<T, write_ops::UpdateCommandRequest> || + std::is_same_v<T, write_ops::DeleteCommandRequest>; + +// Type requirement 2 for isTimeseries() +template <typename T> +constexpr bool isRequestableOnUserTimeseriesNamespace = + std::is_same_v<T, DeleteRequest> || std::is_same_v<T, write_ops::FindAndModifyCommandRequest>; + +// Disjuction of type requirements for isTimeseries() +template <typename T> +constexpr bool isRequestableOnTimeseries = + isRequestableWithTimeseriesBucketNamespace<T> || isRequestableOnUserTimeseriesNamespace<T>; + +/** + * Returns a pair of (whether 'request' is made on a timeseries collection and the timeseries + * system bucket collection namespace if so). + * + * If the 'request' is not made on a timeseries collection, the second element of the pair is same + * as the namespace of the 'request'. + */ +template <typename T> +requires isRequestableOnTimeseries<T> std::pair<bool, NamespaceString> isTimeseries( + OperationContext* opCtx, const T& request) { + const auto [nss, bucketNss] = [&] { + if constexpr (isRequestableWithTimeseriesBucketNamespace<T>) { + auto nss = request.getNamespace(); + uassert(5916400, + "'isTimeseriesNamespace' parameter can only be set when the request is sent on " + "system.buckets namespace", + !request.getIsTimeseriesNamespace() || nss.isTimeseriesBucketsCollection()); + return request.getIsTimeseriesNamespace() + ? std::pair{nss, nss} + : std::pair{nss, nss.makeTimeseriesBucketsNamespace()}; + } else if constexpr (std::is_same_v<T, write_ops::FindAndModifyCommandRequest>) { + auto nss = request.getNamespace(); + return std::pair{nss, nss.makeTimeseriesBucketsNamespace()}; + } else { + auto nss = request.getNsString(); + return std::pair{nss, nss.makeTimeseriesBucketsNamespace()}; + } + }(); + + // If the buckets collection exists now, the time-series insert path will check for the + // existence of the buckets collection later on with a lock. + // If this check is concurrent with the creation of a time-series collection and the buckets + // collection does not yet exist, this check may return false unnecessarily. As a result, an + // insert attempt into the time-series namespace will either succeed or fail, depending on who + // wins the race. + // Hold reference to the catalog for collection lookup without locks to be safe. + auto catalog = CollectionCatalog::get(opCtx); + auto coll = catalog->lookupCollectionByNamespace(opCtx, bucketNss); + bool isTimeseries = (coll && coll->getTimeseriesOptions()); + + return {isTimeseries, isTimeseries ? bucketNss : nss}; +} } // namespace mongo::timeseries diff --git a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp index 93e6e0a10c6..196e5769de3 100644 --- a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp +++ b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp @@ -162,7 +162,7 @@ BSONObj createAggregateCmdObj( } pipeline.emplace_back(BSON(DocumentSourceMatch::kStageName << parsedInfo.query)); if (parsedInfo.sort) { - // TODO (SERVER-73083): skip the sort option for 'findAndModify' calls on time-series + // TODO (SERVER-76530): skip the sort option for 'findAndModify' calls on time-series // collections. pipeline.emplace_back(BSON(DocumentSourceSort::kStageName << *parsedInfo.sort)); } |