diff options
author | Alya Berciu <alyacarina@gmail.com> | 2021-10-04 11:55:24 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-04 12:28:32 +0000 |
commit | 80c8f9c04e2890a65d0f3db64d7b9d9b1e44db73 (patch) | |
tree | 4b623aa347c847fd79767aa055e2b6925a6da8f5 | |
parent | f4aa2b0e25976facd7bda02872e2a46205975dcc (diff) | |
download | mongo-80c8f9c04e2890a65d0f3db64d7b9d9b1e44db73.tar.gz |
SERVER-59180 Implement update rewrite and routing for sharded timeseries collections
16 files changed, 1444 insertions, 58 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_kill_primary.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_kill_primary.yml index 77c6f236701..ec618e5d03f 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_kill_primary.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_kill_primary.yml @@ -159,6 +159,7 @@ selector: - jstests/concurrency/fsm_workloads/collmod_separate_collections.js - jstests/concurrency/fsm_workloads/invalidated_cursors.js - jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js + - jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js - jstests/concurrency/fsm_workloads/view_catalog.js - jstests/concurrency/fsm_workloads/view_catalog_cycle_with_drop.js - jstests/concurrency/fsm_workloads/view_catalog_direct_system_writes.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_terminate_primary.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_terminate_primary.yml index 2ba95613343..0fe5a99e65a 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_terminate_primary.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_terminate_primary.yml @@ -159,6 +159,7 @@ selector: - jstests/concurrency/fsm_workloads/collmod_separate_collections.js - jstests/concurrency/fsm_workloads/invalidated_cursors.js - jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js + - jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js - jstests/concurrency/fsm_workloads/view_catalog.js - jstests/concurrency/fsm_workloads/view_catalog_cycle_with_drop.js - jstests/concurrency/fsm_workloads/view_catalog_direct_system_writes.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml index b2de0ac89c4..5b3e6b6bd15 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml @@ -142,6 +142,7 @@ selector: - jstests/concurrency/fsm_workloads/collmod_writeconflict.js - jstests/concurrency/fsm_workloads/invalidated_cursors.js - jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js + - jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js - jstests/concurrency/fsm_workloads/view_catalog.js - jstests/concurrency/fsm_workloads/view_catalog_cycle_with_drop.js - jstests/concurrency/fsm_workloads/view_catalog_direct_system_writes.js diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js b/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js index 4b8e496dd05..fa052f3c730 100644 --- a/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js +++ b/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js @@ -34,6 +34,17 @@ var $config = extendWorkload($config, function($config, $super) { $config.data.bucketPrefix = "system.buckets."; + $config.data.timeField = 't'; + $config.data.metaField = 'm'; + + $config.data.generateMetaFieldValueForInitialInserts = () => { + return Math.floor(Random.rand() * $config.data.numMetaCount); + }; + + $config.data.generateMetaFieldValueForInsertStage = (i) => { + return i % $config.data.numMetaCount; + }; + $config.threadCount = 10; $config.iterations = 40; $config.startState = "init"; @@ -49,8 +60,8 @@ var $config = extendWorkload($config, function($config, $super) { this.startTime + Math.floor(Random.rand() * this.numInitialDocs * this.increment); const doc = { _id: new ObjectId(), - t: new Date(timer), - m: Math.floor(Random.rand() * this.numMetaCount) + [this.timeField]: new Date(timer), + [this.metaField]: this.generateMetaFieldValueForInsertStage(this.tid), }; assertAlways.commandWorked(db[collName].insert(doc)); assertAlways.commandWorked(db[this.nonShardCollName].insert(doc)); @@ -121,8 +132,11 @@ var $config = extendWorkload($config, function($config, $super) { // buckets with all documents. const verifyBucketIndex = (bucketIndex) => { const unpackStage = { - "$_internalUnpackBucket": - {"timeField": "t", "metaField": "m", "bucketMaxSpanSeconds": NumberInt(3600)} + "$_internalUnpackBucket": { + "timeField": this.timeField, + "metaField": this.metaField, + "bucketMaxSpanSeconds": NumberInt(3600) + } }; const bucketColl = db.getCollection(`system.buckets.${collName}`); const numDocsInBuckets = @@ -151,8 +165,8 @@ var $config = extendWorkload($config, function($config, $super) { db[collName].drop(); db[this.nonShardCollName].drop(); - assertAlways.commandWorked( - db.createCollection(collName, {timeseries: {timeField: "t", metaField: "m"}})); + assertAlways.commandWorked(db.createCollection( + collName, {timeseries: {timeField: this.timeField, metaField: this.metaField}})); cluster.shardCollection(db[collName], {t: 1}, false); // Create indexes to verify index integrity during the teardown state. @@ -171,8 +185,8 @@ var $config = extendWorkload($config, function($config, $super) { const doc = { _id: new ObjectId(), - t: new Date(currentTimeStamp), - m: i % this.numMetaCount + [this.timeField]: new Date(currentTimeStamp), + [this.metaField]: this.generateMetaFieldValueForInitialInserts(i), }; bulk.insert(doc); bulkUnsharded.insert(doc); diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js b/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js new file mode 100644 index 00000000000..e700d90f095 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js @@ -0,0 +1,79 @@ +/** + * Tests the updates into sharded time-series collection during a chunk migration. To ensure the + * correctness, the test does the same writes into an unsharded collection and verifies that the + * number of documents remain the same at the end. This test also checks that indexes on the + * time-series buckets collection remain consistent after the test run. + * @tags: [ + * requires_sharding, + * assumes_balancer_off, + * requires_non_retryable_writes, + * ] + */ +'use strict'; + +const numValues = 10; + +load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest' helpers. +load('jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js'); + +var $config = extendWorkload($config, function($config, $super) { + $config.states.init = function(db, collName, connCache) { + if (TimeseriesTest.timeseriesCollectionsEnabled(db) && + TimeseriesTest.shardedtimeseriesCollectionsEnabled(db) && + TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(db) && + TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(db)) { + this.featureFlagDisabled = false; + } else { + jsTestLog( + "Skipping executing this test as the requisite feature flags are not enabled."); + } + + $super.states.init(db, collName); + }; + + $config.data.generateMetaFieldValueForInitialInserts = () => { + let meta = {}; + // Insert a document with a field for every thread to test concurrent updates on the + // same document. + for (let i = 0; i < $config.threadCount; i++) { + meta["tid" + i] = Random.randInt(numValues); + } + return meta; + }; + + $config.data.generateMetaFieldValueForInsertStage = (tid) => { + return {["tid" + tid]: Random.randInt(numValues)}; + }; + + $config.states.update = function(db, collName, connCache) { + if (this.featureFlagDisabled) { + return; + } + + const shardedColl = db[collName]; + const unshardedColl = db[this.nonShardCollName]; + const updateField = "tid" + this.tid; + const oldValue = Random.randInt(numValues); + + // Updates some measurements along the field owned by this thread in both sharded and + // unsharded ts collections. + jsTestLog("Executing update state on: " + collName + " on field " + updateField); + assertAlways.commandWorked( + shardedColl.update({[this.metaField]: {[updateField]: {$gte: oldValue}}}, + {$inc: {[this.metaField + "." + updateField]: 1}}, + {multi: true})); + assertAlways.commandWorked( + unshardedColl.update({[this.metaField]: {[updateField]: {$gte: oldValue}}}, + {$inc: {[this.metaField + "." + updateField]: 1}}, + {multi: true})); + }; + + $config.transitions = { + init: {insert: 1}, + insert: {insert: 0.4, moveChunk: 0.1, update: 0.5}, + update: {insert: 0.5, moveChunk: 0.1, update: 0.4}, + moveChunk: {insert: 0.4, moveChunk: 0.1, update: 0.5}, + }; + + return $config; +}); diff --git a/jstests/core/timeseries/libs/timeseries.js b/jstests/core/timeseries/libs/timeseries.js index a226803058a..10e2d8b90aa 100644 --- a/jstests/core/timeseries/libs/timeseries.js +++ b/jstests/core/timeseries/libs/timeseries.js @@ -29,6 +29,16 @@ var TimeseriesTest = class { .featureFlagTimeseriesUpdatesAndDeletes.value; } + /** + * Returns whether sharded time-series updates and deletes are supported. + */ + static shardedTimeseriesUpdatesAndDeletesEnabled(conn) { + return assert + .commandWorked( + conn.adminCommand({getParameter: 1, featureFlagShardedTimeSeriesUpdateDelete: 1})) + .featureFlagShardedTimeSeriesUpdateDelete.value; + } + static shardedtimeseriesCollectionsEnabled(conn) { return assert .commandWorked(conn.adminCommand({getParameter: 1, featureFlagShardedTimeSeries: 1})) diff --git a/jstests/core/timeseries/timeseries_insert_after_update.js b/jstests/core/timeseries/timeseries_insert_after_update.js index 37e1df137a3..eec6005720e 100644 --- a/jstests/core/timeseries/timeseries_insert_after_update.js +++ b/jstests/core/timeseries/timeseries_insert_after_update.js @@ -3,7 +3,6 @@ * were updated. * * @tags: [ - * assumes_unsharded_collection, # TODO SERVER-59180: Remove this tag. * does_not_support_stepdowns, * does_not_support_transactions, * requires_getmore, @@ -14,12 +13,26 @@ "use strict"; load("jstests/core/timeseries/libs/timeseries.js"); +load("jstests/libs/fixture_helpers.js"); if (!TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(db.getMongo())) { jsTestLog("Skipping test because the time-series updates and deletes feature flag is disabled"); return; } +if (FixtureHelpers.isMongos(db) && + !TimeseriesTest.shardedtimeseriesCollectionsEnabled(db.getMongo())) { + jsTestLog("Skipping test because the time-series updates and deletes feature flag is disabled"); + return; +} + +if (FixtureHelpers.isMongos(db) && + !TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(db.getMongo())) { + jsTestLog( + "Skipping test because the sharded time-series updates and deletes feature flag is disabled"); + return; +} + TimeseriesTest.run((insert) => { const testDB = db.getSiblingDB(jsTestName()); assert.commandWorked(testDB.dropDatabase()); diff --git a/jstests/core/timeseries/timeseries_update.js b/jstests/core/timeseries/timeseries_update.js index ff5c8e6b7d6..82ae774089a 100644 --- a/jstests/core/timeseries/timeseries_update.js +++ b/jstests/core/timeseries/timeseries_update.js @@ -1,7 +1,6 @@ /** * Tests running the update command on a time-series collection. * @tags: [ - * assumes_unsharded_collection, # TODO SERVER-59180: Remove this tag. * does_not_support_stepdowns, * does_not_support_transactions, * requires_getmore, @@ -12,12 +11,26 @@ "use strict"; load("jstests/core/timeseries/libs/timeseries.js"); +load("jstests/libs/fixture_helpers.js"); if (!TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(db.getMongo())) { jsTestLog("Skipping test because the time-series updates and deletes feature flag is disabled"); return; } +if (FixtureHelpers.isMongos(db) && + !TimeseriesTest.shardedtimeseriesCollectionsEnabled(db.getMongo())) { + jsTestLog("Skipping test because the sharded time-series feature flag is disabled"); + return; +} + +if (FixtureHelpers.isMongos(db) && + !TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(db.getMongo())) { + jsTestLog( + "Skipping test because the sharded time-series updates and deletes feature flag is disabled"); + return; +} + const timeFieldName = "time"; const metaFieldName = "tag"; const dateTime = ISODate("2021-07-12T16:00:00Z"); diff --git a/jstests/core/timeseries/timeseries_update_concurrent.js b/jstests/core/timeseries/timeseries_update_concurrent.js index fdea47f4683..6ca2745b5f5 100644 --- a/jstests/core/timeseries/timeseries_update_concurrent.js +++ b/jstests/core/timeseries/timeseries_update_concurrent.js @@ -2,7 +2,7 @@ * Tests running the update command on a time-series collection with concurrent modifications to the * collection. * @tags: [ - * assumes_unsharded_collection, # TODO SERVER-59180: Remove this tag. + * assumes_unsharded_collection, # TODO SERVER-60233: Remove this tag. * does_not_support_stepdowns, * does_not_support_transactions, * requires_fcv_51, diff --git a/jstests/core/timeseries/timeseries_update_hint.js b/jstests/core/timeseries/timeseries_update_hint.js index b658bd13539..9dbc145a66f 100644 --- a/jstests/core/timeseries/timeseries_update_hint.js +++ b/jstests/core/timeseries/timeseries_update_hint.js @@ -1,7 +1,7 @@ /** * Tests passing a hint to the update command on a time-series collection. * @tags: [ - * assumes_unsharded_collection, # TODO SERVER-59180: Remove this tag. + * assumes_unsharded_collection, # TODO SERVER-60233: Remove this tag. * does_not_support_stepdowns, * does_not_support_transactions, * requires_fcv_51, @@ -318,4 +318,4 @@ testUpdateHintFailed({ nModifiedBuckets: 0, failCode: ErrorCodes.BadValue, }); -})();
\ No newline at end of file +})(); diff --git a/jstests/sharding/timeseries_multiple_mongos.js b/jstests/sharding/timeseries_multiple_mongos.js index 4a0d38a3254..bc34225099f 100644 --- a/jstests/sharding/timeseries_multiple_mongos.js +++ b/jstests/sharding/timeseries_multiple_mongos.js @@ -57,6 +57,9 @@ function generateBatch(size) { */ function runTest({shardKey, cmdObj, numProfilerEntries}) { const isDelete = cmdObj["delete"] !== undefined; + const isUpdate = cmdObj["update"] !== undefined; + const cmdCollName = cmdObj[Object.keys(cmdObj)[0]]; + const shardKeyHasMetaField = shardKey[metaField] !== undefined; // Insert some dummy data using 'mongos1' as the router, so that the cache is initialized on the // mongos while the collection is unsharded. @@ -71,21 +74,22 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) { key: shardKey, })); - assert.commandWorked( - mongos0.adminCommand({split: `${dbName}.system.buckets.${collName}`, middle: {meta: 1}})); + // Move one of the chunks into the second shard. Note that we can only do this if the meta field + // is part of the shard key. + const middle = shardKeyHasMetaField ? {meta: 1} : {"meta.a": 1}; + assert.commandWorked(mongos0.adminCommand({split: `${dbName}.${bucketsCollName}`, middle})); - // Move one of the chunks into the second shard. const primaryShard = st.getPrimaryShard(dbName); const otherShard = st.getOther(primaryShard); assert.commandWorked(mongos0.adminCommand({ - movechunk: `${dbName}.system.buckets.${collName}`, - find: {meta: 1}, + movechunk: `${dbName}.${bucketsCollName}`, + find: middle, to: otherShard.name, _waitForDelete: true })); // Validate the command by running against 'mongos1' as the router. - function validateCommand(collName, numEntries, unVersioned) { + function validateCommand(cmdCollName, numEntries, unVersioned) { // Restart profiler. for (let shardDB of [shard0DB, shard1DB]) { shardDB.setProfilingLevel(0); @@ -101,9 +105,13 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) { } const queryField = `command.${Object.keys(cmdObj)[0]}`; - let filter = {[queryField]: collName, "command.shardVersion.0": {$ne: Timestamp(0, 0)}}; - if (isDelete) { - filter = {"op": "remove", "ns": `${dbName}.${collName}`, "ok": {$ne: 0}}; + let filter = {[queryField]: cmdCollName, "command.shardVersion.0": {$ne: Timestamp(0, 0)}}; + + // We currently do not log 'shardVersion' for updates. See SERVER-60354 for details. + if (isUpdate) { + filter = {"op": "update", "ns": `${dbName}.${cmdCollName}`, "ok": {$ne: 0}}; + } else if (isDelete) { + filter = {"op": "remove", "ns": `${dbName}.${cmdCollName}`, "ok": {$ne: 0}}; } else if (unVersioned) { filter["command.shardVersion.0"] = Timestamp(0, 0); } @@ -115,11 +123,9 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) { {shard0Entries: shard0Entries, shard1Entries: shard1Entries}); } - let targetShardedCollection = bucketsCollName; - if (isDelete && cmdObj["delete"] !== bucketsCollName) { - targetShardedCollection = collName; - } - validateCommand(targetShardedCollection, numProfilerEntries.sharded); + // The update command is always logged as being on the user-provided namespace. + validateCommand((isUpdate || isDelete) ? cmdCollName : bucketsCollName, + numProfilerEntries.sharded); // Insert dummy data so that the 'mongos1' sees the collection as sharded. assert.commandWorked(mongos1.getCollection(collName).insert({[timeField]: ISODate()})); @@ -130,9 +136,7 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) { collName, {timeseries: {timeField: timeField, metaField: metaField}})); // When unsharded, the command should be run against the user requested namespace. - validateCommand(cmdObj[Object.keys(cmdObj)[0]] /* coll name specified in the command */, - numProfilerEntries.unsharded, - true); + validateCommand(cmdCollName, numProfilerEntries.unsharded, true); } /** @@ -248,6 +252,60 @@ runTest({ if (TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(st.shard0) && TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(st.shard0)) { + // Tests for updates. + runTest({ + shardKey: {[metaField + ".a"]: 1}, + cmdObj: { + update: collName, + updates: [{ + q: {}, + u: {$inc: {[metaField + ".b"]: 1}}, + multi: true, + }] + }, + numProfilerEntries: {sharded: 2, unsharded: 1}, + }); + + runTest({ + shardKey: {[metaField + ".a"]: 1}, + cmdObj: { + update: collName, + updates: [{ + q: {[metaField + ".a"]: 1}, + u: {$inc: {[metaField + ".b"]: -1}}, + multi: true, + }] + }, + numProfilerEntries: {sharded: 1, unsharded: 1}, + }); + + runTest({ + shardKey: {[metaField + ".a"]: 1}, + cmdObj: { + update: bucketsCollName, + updates: [{ + q: {}, + u: {$inc: {["meta.b"]: 1}}, + multi: true, + }] + }, + numProfilerEntries: {sharded: 2, unsharded: 1}, + }); + + runTest({ + shardKey: {[metaField + ".a"]: 1}, + cmdObj: { + update: bucketsCollName, + updates: [{ + q: {["meta.a"]: 1}, + u: {$inc: {["meta.b"]: -1}}, + multi: true, + }] + }, + numProfilerEntries: {sharded: 1, unsharded: 1}, + }); + + // Tests for deletes. runTest({ shardKey: {[metaField]: 1}, cmdObj: { @@ -295,8 +353,6 @@ if (TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(st.shard0) && }, numProfilerEntries: {sharded: 1, unsharded: 1}, }); - - // TODO SERVER-59180: Add tests for updates. } st.stop(); diff --git a/jstests/sharding/timeseries_update.js b/jstests/sharding/timeseries_update.js new file mode 100644 index 00000000000..379da48c672 --- /dev/null +++ b/jstests/sharding/timeseries_update.js @@ -0,0 +1,970 @@ +/** + * Test updates into sharded timeseries collection. + * + * @tags: [ + * requires_fcv_51, + * requires_find_command, + * ] + */ + +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest' helpers. + +const st = new ShardingTest({shards: 2, rs: {nodes: 2}}); +const mongos = st.s0; + +// +// Constants used throughout all tests. +// + +const dbName = 'testDB'; +const collName = 'coll'; +const timeField = "time"; +const metaField = "tag"; +const dateTime = ISODate("2021-07-12T16:00:00Z"); + +// +// Checks for feature flags. +// + +if (!TimeseriesTest.timeseriesCollectionsEnabled(st.shard0)) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + st.stop(); + return; +} + +if (!TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { + jsTestLog("Skipping test because the sharded time-series collection feature flag is disabled"); + st.stop(); + return; +} + +if (!TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(st.shard0)) { + jsTestLog( + "Skipping test because the updates and deletes on time-series collection feature flag is disabled"); + st.stop(); + return; +} + +const testDB = mongos.getDB(dbName); +testDB.dropDatabase(); +assert.commandWorked(mongos.adminCommand({enableSharding: dbName})); + +if (!TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(st.shard0)) { + // Ensure that the feature flag correctly prevents us from running an update on a sharded + // timeseries collection. + assert.commandWorked(testDB.createCollection(collName, {timeseries: {timeField, metaField}})); + const coll = testDB.getCollection(collName); + assert.commandWorked(coll.createIndex({[timeField]: 1})); + assert.commandWorked(mongos.adminCommand({ + shardCollection: `${dbName}.${collName}`, + key: {[timeField]: 1}, + })); + assert.commandFailedWithCode( + testDB.runCommand( + {update: coll.getName(), updates: [{q: {}, u: {[metaField]: 1}, multi: true}]}), + [ErrorCodes.NotImplemented, ErrorCodes.InvalidOptions]); + st.stop(); + return; +} + +const doc1 = { + _id: 1, + [timeField]: dateTime, + [metaField]: {a: "A", b: "B"} +}; +const doc2 = { + _id: 2, + [timeField]: dateTime, + [metaField]: {c: "C", d: 2}, + f: [{"k": "K", "v": "V"}] +}; +const doc3 = { + _id: 3, + [timeField]: dateTime, + f: "F" +}; +const doc4 = { + _id: 4, + [timeField]: dateTime, + [metaField]: {a: "A", b: "B"}, + f: "F" +}; +const doc5 = { + _id: 5, + [timeField]: dateTime, + [metaField]: {a: "A", b: "B", c: "C"} +}; + +// +// Helper functions. +// + +/** + * Confirms that a set of updates returns the expected set of documents. + */ +function testShardedUpdate({ + insert, + shardKey, + updatesMetaFieldInShardKey, + timeseries, + initialDocList, + updates, + resultDocList, + n, + nModified = n, + letDoc = {}, + failCode, + ordered = true, +}) { + assert.commandWorked(mongos.adminCommand({enableSharding: dbName})); + assert.commandWorked(testDB.createCollection(collName, {timeseries})); + + const coll = testDB.getCollection(collName); + assert.commandWorked(coll.createIndex(shardKey)); + assert.commandWorked(insert(coll, initialDocList)); + + assert.commandWorked(mongos.adminCommand({ + shardCollection: `${dbName}.${collName}`, + key: shardKey, + })); + + // Updates on timeseries collections are not allowed if no metaField is defined. + if (!timeseries["metaField"]) { + failCode = [ErrorCodes.InvalidOptions]; + n = 0; + nModified = 0; + resultDocList = initialDocList; + } + + // Updates on sharded timeseries meta fields are only allowed as long as the field updated is + // not in the shard key. + if (updatesMetaFieldInShardKey) { + failCode = [ErrorCodes.InvalidOptions, 31025]; + n = 0; + nModified = 0; + resultDocList = initialDocList; + } + + const updateCommand = {update: coll.getName(), updates, ordered, let : letDoc}; + const res = failCode ? assert.commandFailedWithCode(testDB.runCommand(updateCommand), failCode) + : assert.commandWorked(testDB.runCommand(updateCommand)); + + assert.eq(n, res.n); + assert.eq(nModified, res.nModified); + assert.eq(initialDocList.length, resultDocList.length); + + resultDocList.forEach(resultDoc => { + assert.docEq( + resultDoc, + coll.findOne({_id: resultDoc._id}), + "Expected document not found in result collection:" + tojson(coll.find().toArray())); + }); + + assert(coll.drop()); +} + +/** + * Wrapper on testShardedUpdate to compose tests with a given shardKey/timeseries combo. + */ +function testUpdates({shardKeyTimeField, shardKeyMetaFieldPath, timeseriesOptions, tests}) { + // Set up the shard key for the tests. + let shardKey = {}; + if (shardKeyMetaFieldPath) { + shardKey[shardKeyMetaFieldPath] = 1; + } + if (shardKeyTimeField) { + shardKey[shardKeyTimeField] = 1; + } + + // Run a series of updates. + TimeseriesTest.run((insert) => { + // Helper lambda which has the extensions field set to: + // - undefined if we are updating the meta field. + // - the name of a subfield of meta we are updating. + // - an array of names of subfields of meta we are updating. + const checkUpdatesMetaFieldInShardKey = (pathToMetaFieldBeingUpdated) => { + if ((pathToMetaFieldBeingUpdated === undefined) || !shardKeyMetaFieldPath) { + // If we do not have the meta field in the shard key, we are able to update it. + return false; + } else if ((shardKeyMetaFieldPath === metaField) || + (pathToMetaFieldBeingUpdated === "")) { + // If the top-level meta field is in the shard key, we cannot update it. + return true; + } else if (!Array.isArray(pathToMetaFieldBeingUpdated)) { + pathToMetaFieldBeingUpdated = [pathToMetaFieldBeingUpdated]; + } + for (const e of pathToMetaFieldBeingUpdated) { + if (metaField + "." + e === shardKeyMetaFieldPath) { + return true; + } + } + return false; + }; + + const testUpdate = (fields, additionalFields = {}) => { + let inputs = Object.assign({}, fields, additionalFields); + testShardedUpdate(Object.assign({}, inputs, { + shardKey, + insert, + timeseries: timeseriesOptions, + updatesMetaFieldInShardKey: + checkUpdatesMetaFieldInShardKey(inputs.pathToMetaFieldBeingUpdated) + })); + }; + + tests.forEach(test => test({testUpdate})); + }, testDB); +} + +/** + * Helper function to generate the parameters to pass to 'testUpdates' when an update is expected to + * fail. + */ +function expectFailedUpdate(initialDocList) { + return { + initialDocList, + resultDocList: initialDocList, + n: 0, + failCode: ErrorCodes.InvalidOptions, + }; +} + +// +// Test cases when the update command fails. +// + +function testCaseMultiFalseUpdateFails({testUpdate}) { + testUpdate({updates: [{q: {[metaField]: {b: "B"}}, u: {$set: {[metaField]: {b: "C"}}}}]}, + expectFailedUpdate([doc1])); +} + +function testCaseReplacementAndPipelineUpdateFails({testUpdate}) { + const expectFailedUpdateDoc = expectFailedUpdate([doc2]); + + // Replace a document to have no metaField, which should fail since updates with replacement + // documents are not supported. + testUpdate({ + updates: [{ + q: {[metaField]: {c: "C", d: 2}}, + u: {f2: {e: "E", f: "F"}, f3: 7}, + multi: true, + }] + }, + expectFailedUpdateDoc); + + // Replace a document with an empty document, which should fail since updates with replacement + // documents are not supported. + testUpdate({ + updates: [{ + q: {[metaField]: {c: "C", d: 2}}, + u: {}, + multi: true, + }] + }, + expectFailedUpdateDoc); + + // Modify the metaField, which should fail since pipeline-style updates are not supported. + testUpdate({ + updates: [{ + q: {}, + u: [ + {$addFields: {[metaField + ".c"]: "C", [metaField + ".e"]: "E"}}, + {$unset: metaField + ".e"} + ], + multi: true, + }] + }, + expectFailedUpdateDoc); +} + +function testCaseNoMetaFieldQueryUpdateFails({testUpdate}) { + // Query on a field which is not the (nonexistent) metaField. + testUpdate({ + updates: [{ + q: {f: "F"}, + u: {}, + multi: true, + }] + }, + expectFailedUpdate([doc3])); + + // Query on all documents and update them to be empty documents. + testUpdate({ + updates: [{ + q: {}, + u: {}, + multi: true, + }] + }, + expectFailedUpdate([doc3])); + + // Query on all documents and update them to be non-empty documents. + testUpdate({ + updates: [{ + q: {}, + u: {f: "FF"}, + multi: true, + }] + }, + expectFailedUpdate([doc3])); + + // Query on a field that is not the metaField. + testUpdate({ + updates: [{ + q: {measurement: "cpu"}, + u: {$set: {[metaField]: {c: "C"}}}, + multi: true, + }] + }, + expectFailedUpdate([doc1])); + + // Query on the metaField and a field that is not the metaField. + testUpdate({ + updates: [ + { + q: {[metaField]: {a: "A", b: "B"}, measurement: "cpu"}, + u: {$set: {[metaField]: {c: "C"}}}, + multi: true, + }, + ] + }, + expectFailedUpdate([doc1])); + + // Query on a field that is not the metaField using dot notation and modify the metaField. + testUpdate({ + updates: [{ + q: {"measurement.A": "cpu"}, + u: {$set: {[metaField]: {c: "C"}}}, + multi: true, + }] + }, + expectFailedUpdate([doc1])); +} + +function testCaseIllegalMetaFieldUpdateFails({testUpdate}) { + // Query on the metaField and modify a field that is not the metaField. + testUpdate({ + updates: [{ + q: {[metaField]: {c: "C", d: 2}}, + u: {$set: {f2: "f2"}}, + multi: true, + }] + }, + expectFailedUpdate([doc2])); + + // Query on the metaField and modify the metaField and fields that are not the metaField. + testUpdate({ + updates: [{ + q: {[metaField]: {c: "C", d: 2}}, + u: {$set: {[metaField]: {e: "E"}, f3: "f3"}, $inc: {f2: 3}, $unset: {f1: ""}}, + multi: true, + }] + }, + expectFailedUpdate([doc2])); + + // Rename the metaField. + testUpdate({ + updates: [{ + q: {[metaField + ".a"]: "A"}, + u: {$rename: {[metaField]: "Z"}}, + multi: true, + }] + }, + expectFailedUpdate([doc1, doc2, doc4])); + + // Rename a subfield of the metaField to something not in the metaField. + testUpdate({ + updates: [{ + q: {[metaField + ".a"]: "A"}, + u: {$rename: {[metaField + ".a"]: "notMetaField.a"}}, + multi: true, + }] + }, + expectFailedUpdate([doc1, doc2, doc4])); +} + +// +// Test cases when the update command succeeds. +// + +function testCaseBatchUpdates({testUpdate}) { + // Multiple updates, ordered: query on the metaField and modify the metaField multiple times. + testUpdate({ + initialDocList: [doc2], + updates: [ + { + q: {[metaField]: {c: "C", d: 2}}, + u: {$set: {[metaField]: 1}}, + multi: true, + }, + { + q: {[metaField]: 1}, + u: {$set: {[metaField]: 2}}, + multi: true, + }, + { + q: {[metaField]: 2}, + u: {$set: {[metaField]: 3}}, + multi: true, + } + ], + resultDocList: [{ + _id: 2, + [timeField]: dateTime, + [metaField]: 3, + f: [{"k": "K", "v": "V"}], + }], + n: 3, + pathToMetaFieldBeingUpdated: "", + }); + + // Multiple updates, ordered: query on the metaField and on a field that is not the metaField. + testUpdate({ + initialDocList: [doc1], + updates: [ + { + q: {[metaField]: {a: "A", b: "B"}}, + u: {$set: {[metaField]: {c: "C", d: 1}}}, + multi: true, + }, + { + q: {measurement: "cpu", [metaField + ".d"]: 1}, + u: {$set: {[metaField + ".c"]: "CC"}}, + multi: true, + } + ], + resultDocList: [doc1], + // If the shardKey is on one of the fields being updated, this must fail. + n: 1, + pathToMetaFieldBeingUpdated: "", + failCode: ErrorCodes.InvalidOptions, + }); + + // Multiple updates, ordered: query on the metaField and modify the metaField and a field that + // is not the metaField using dot notation. + testUpdate({ + initialDocList: [doc2], + updates: [ + { + q: {[metaField]: {c: "C", d: 2}}, + u: {$inc: {[metaField + ".d"]: 6}}, + multi: true, + }, + { + q: {[metaField]: {c: "C", d: 8}}, + u: {$set: {"f1.0": "f2"}}, + multi: true, + } + ], + resultDocList: [{ + _id: 2, + [timeField]: dateTime, + [metaField]: {c: "C", d: 8}, + f: [{"k": "K", "v": "V"}], + }], + // If the shardKey is on the field being updated, this must fail to update any docs. + n: 1, + pathToMetaFieldBeingUpdated: "d", + failCode: ErrorCodes.InvalidOptions, + }); + + // Multiple updates, ordered: query on the metaField and modify a field that is not the + // metaField using dot notation. + testUpdate({ + updates: [ + { + q: {[metaField]: {c: "C", d: 2}}, + u: {$set: {"f1.0": "f2"}}, + multi: true, + }, + { + q: {[metaField]: {c: "C", d: 2}}, + u: {$inc: {[metaField + ".d"]: 6}}, + multi: true, + } + ] + }, + expectFailedUpdate([doc2])); + + // Multiple updates, unordered: Modify the metaField, a field that is not the metaField, and the + // metaField. The first and last updates should succeed. + testUpdate({ + initialDocList: [doc2], + updates: [ + { + q: {[metaField]: {c: "C", d: 2}}, + u: {$inc: {[metaField + ".d"]: 6}}, + multi: true, + }, + { + q: {[metaField]: {c: "C", d: 8}}, + u: {$set: {"f1.0": "f2"}}, + multi: true, + }, + { + q: {[metaField]: {c: "C", d: 8}}, + u: {$inc: {[metaField + ".d"]: 7}}, + multi: true, + } + ], + resultDocList: [{ + _id: 2, + [timeField]: dateTime, + [metaField]: {c: "C", d: 15}, + f: [{"k": "K", "v": "V"}], + }], + ordered: false, + n: 2, + pathToMetaFieldBeingUpdated: "d", + failCode: ErrorCodes.InvalidOptions, + }); +} + +function testCaseValidMetaFieldUpdates({testUpdate}) { + // Rename a subfield to the metaField. + testUpdate({ + initialDocList: [doc1, doc2], + updates: [{ + q: {[metaField + ".a"]: "A"}, + u: {$rename: {[metaField + ".a"]: metaField + ".z"}}, + multi: true, + }], + resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: {z: "A", b: "B"}}, doc2], + n: 1, + pathToMetaFieldBeingUpdated: "a", + }); + + // Query on the metaField and modify the metaField. + testUpdate({ + initialDocList: [doc1], + updates: [{ + q: {[metaField]: {a: "A", b: "B"}}, + u: {$set: {[metaField]: {c: "C"}}}, + multi: true, + }], + resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: {c: "C"}}], + n: 1, + pathToMetaFieldBeingUpdated: "", + }); + + // Query on the metaField and modify the metaField of 1 matching document. + testUpdate({ + initialDocList: [doc1, doc2, doc4, doc5], + updates: [{ + q: {"$and": [{[metaField + ".c"]: "C"}, {[metaField + ".d"]: 2}]}, + u: {$set: {[metaField + ".c"]: 1}}, + multi: true, + }], + resultDocList: [ + doc1, + {_id: 2, [timeField]: dateTime, [metaField]: {c: 1, d: 2}, f: [{"k": "K", "v": "V"}]}, + doc4, + doc5 + ], + ordered: false, + n: 1, + pathToMetaFieldBeingUpdated: "c", + }); + + // Query on the metaField and update the metaField of multiple matching documents. + testUpdate({ + initialDocList: [doc1, doc2, doc4, doc5], + updates: [{ + q: {[metaField + ".a"]: "A"}, + u: {$unset: {[metaField + ".a"]: ""}}, + multi: true, + }], + resultDocList: [ + {_id: 1, [timeField]: dateTime, [metaField]: {b: "B"}}, + {_id: 2, [timeField]: dateTime, [metaField]: {c: "C", d: 2}, f: [{"k": "K", "v": "V"}]}, + {_id: 4, [timeField]: dateTime, [metaField]: {b: "B"}, f: "F"}, + {_id: 5, [timeField]: dateTime, [metaField]: {b: "B", c: "C"}} + ], + ordered: false, + n: 3, + pathToMetaFieldBeingUpdated: "a", + }); + + // Compound query on the metaField using dot notation and modify the metaField. + testUpdate({ + initialDocList: [doc1], + updates: [{ + q: {"$and": [{[metaField + ".a"]: "A"}, {[metaField + ".b"]: "B"}]}, + u: {$set: {[metaField]: {c: "C"}}}, + multi: true, + }], + resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: {c: "C"}}], + n: 1, + pathToMetaFieldBeingUpdated: "", + }); + + // Query on the metaField using dot notation and modify the metaField. + testUpdate({ + initialDocList: [doc1], + updates: [{ + q: {[metaField + ".a"]: "A"}, + u: {$set: {[metaField]: {c: "C"}}}, + multi: true, + }], + resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: {c: "C"}}], + n: 1, + pathToMetaFieldBeingUpdated: "", + }); + + // Query on the metaField using dot notation and modify the metaField. + testUpdate({ + initialDocList: [doc2], + updates: [{ + q: {[metaField + ".c"]: "C"}, + u: {$inc: {[metaField + ".d"]: 10}}, + multi: true, + }], + resultDocList: [ + {_id: 2, [timeField]: dateTime, [metaField]: {c: "C", d: 12}, f: [{"k": "K", "v": "V"}]} + ], + n: 1, + pathToMetaFieldBeingUpdated: "d", + }); + + // Query with an empty document (i.e update all documents in the collection). + testUpdate({ + initialDocList: [doc1, doc2], + updates: [{ + q: {}, + u: {$set: {[metaField]: {z: "Z"}}}, + multi: true, + }], + resultDocList: [ + {_id: 1, [timeField]: dateTime, [metaField]: {z: "Z"}}, + {_id: 2, [timeField]: dateTime, [metaField]: {z: "Z"}, f: [{"k": "K", "v": "V"}]} + ], + n: 2, + pathToMetaFieldBeingUpdated: "", + }); + + // Remove the metaField. + testUpdate({ + initialDocList: [doc1], + updates: + [{q: {[metaField]: {a: "A", b: "B"}}, u: {$unset: {[metaField]: ""}}, multi: true}], + resultDocList: [{_id: 1, [timeField]: dateTime}], + n: 1, + pathToMetaFieldBeingUpdated: "", + }); + + // Update where one of the matching documents is a no-op update. + testUpdate({ + initialDocList: [doc1, doc4, doc5], + updates: [ + { + q: {[metaField + ".a"]: "A"}, + u: {$set: {[metaField + ".c"]: "C"}}, + multi: true, + }, + ], + resultDocList: [ + {_id: 1, [timeField]: dateTime, [metaField]: {a: "A", b: "B", c: "C"}}, + {_id: 4, [timeField]: dateTime, [metaField]: {a: "A", b: "B", c: "C"}, f: "F"}, + {_id: 5, [timeField]: dateTime, [metaField]: {a: "A", b: "B", c: "C"}} + ], + n: 3, + nModified: 2, + pathToMetaFieldBeingUpdated: "c", + }); + + // Query for documents using $jsonSchema with the metaField required. + testUpdate({ + initialDocList: [doc1, doc2, doc3], + updates: [{ + q: {"$jsonSchema": {"required": [metaField]}}, + u: {$set: {[metaField]: "a"}}, + multi: true + }], + resultDocList: [ + {_id: 1, [timeField]: dateTime, [metaField]: "a"}, + {_id: 2, [timeField]: dateTime, [metaField]: "a", f: [{"k": "K", "v": "V"}]}, + doc3 + ], + n: 2, + pathToMetaFieldBeingUpdated: "", + }); + + // Query for documents using $jsonSchema with the metaField in dot notation required. + testUpdate({ + initialDocList: [doc1, doc2, doc3], + updates: [{ + q: {"$jsonSchema": {"required": [metaField + ".a"]}}, + u: {$set: {[metaField]: "a"}}, + multi: true + }], + resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: "a"}, doc2, doc3], + n: 1, + pathToMetaFieldBeingUpdated: "", + }); + + // Query for documents using $jsonSchema with a field that is not the metaField required. + testUpdate({ + updates: [{ + q: {"$jsonSchema": {"required": [metaField, timeField]}}, + u: {$set: {[metaField]: "a"}}, + multi: true + }], + }, + expectFailedUpdate([doc1, doc2, doc3])); + + const nestedMetaObj = {_id: 6, [timeField]: dateTime, [metaField]: {[metaField]: "A", a: 1}}; + + // Query for documents using $jsonSchema with the metaField required and a required subfield of + // the metaField with the same name as the metaField. + testUpdate({ + initialDocList: [doc1, nestedMetaObj], + updates: [{ + q: { + "$jsonSchema": { + "required": [metaField], + "properties": {[metaField]: {"required": [metaField]}} + } + }, + u: {$set: {[metaField]: "a"}}, + multi: true + }], + resultDocList: [doc1, {_id: 6, [timeField]: dateTime, [metaField]: "a", a: 1}], + n: 1, + pathToMetaFieldBeingUpdated: "", + }); + + // Query for documents using $jsonSchema with the metaField required and an optional field that + // is not the metaField. + testUpdate({ + updates: [{ + q: { + "$jsonSchema": { + "required": [metaField], + "properties": {"measurement": {description: "can be any value"}} + } + }, + u: {$set: {[metaField]: "a"}}, + multi: true + }] + }, + expectFailedUpdate([doc1, nestedMetaObj])); + + // Query for documents on the metaField with the metaField nested within nested operators. + testUpdate({ + initialDocList: [doc1, doc2, doc3], + updates: [{ + q: { + "$and": [ + {"$or": [{[metaField]: {"$ne": "B"}}, {[metaField]: {"a": {"$eq": "B"}}}]}, + {[metaField]: {a: "A", b: "B"}} + ] + }, + u: {$set: {[metaField]: "a"}}, + multi: true + }], + resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: "a"}, doc2, doc3], + n: 1, + pathToMetaFieldBeingUpdated: "", + }); + + // Updates where upsert:false should not insert a new document when no match is found. In case + // the shard key is on the meta field, this should not update any documents but also but not + // report a failure, since no documents were matched. + testUpdate({ + initialDocList: [doc1, doc4, doc5], + updates: [{ + q: {[metaField]: "Z"}, + u: {$set: {[metaField]: 5}}, + multi: true, + }], + resultDocList: [doc1, doc4, doc5], + n: 0, + }); + + // Do the same test case as above but with upsert:true, which should fail. + testUpdate({ + updates: [{ + q: {[metaField]: "Z"}, + u: {$set: {[metaField]: 5}}, + multi: true, + upsert: true, + }] + }, + expectFailedUpdate([doc1, doc4, doc5])); +} + +function testCaseUpdateWithLetDoc({testUpdate}) { + // Use a variable defined in the let option in the query to modify the metaField. + testUpdate({ + initialDocList: [doc1, doc4, doc5], + updates: [{ + q: {$expr: {$eq: ["$" + metaField + ".a", "$$oldVal"]}}, + u: {$set: {[metaField]: "aaa"}}, + multi: true, + }], + letDoc: {oldVal: "A"}, + resultDocList: [ + {_id: 1, [timeField]: dateTime, [metaField]: "aaa"}, + {_id: 4, [timeField]: dateTime, [metaField]: "aaa", f: "F"}, + {_id: 5, [timeField]: dateTime, [metaField]: "aaa"} + ], + n: 3, + pathToMetaFieldBeingUpdated: "", + }); + + // Variables defined in the let option can only be used in the update if the update is an + // pipeline update. Since this update is an update document, the literal name of the variable + // will be used in the update instead of the variable's value. + testUpdate({ + initialDocList: [doc1], + updates: [{ + q: {[metaField + ".a"]: "A"}, + u: {$set: {[metaField]: "$$myVar"}}, + multi: true, + }], + letDoc: {myVar: "aaa"}, + resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: "$$myVar"}], + n: 1, + pathToMetaFieldBeingUpdated: "", + }); + + // Use variables defined in the let option in the query to modify the metaField multiple times. + testUpdate({ + initialDocList: [doc1, doc4, doc5], + updates: [ + { + q: {$expr: {$eq: ["$" + metaField + ".a", "$$val1"]}}, + u: {$set: {[metaField]: "aaa"}}, + multi: true, + }, + { + q: {$expr: {$eq: ["$" + metaField, "$$val2"]}}, + u: {$set: {[metaField]: "bbb"}}, + multi: true, + } + ], + letDoc: {val1: "A", val2: "aaa"}, + resultDocList: [ + {_id: 1, [timeField]: dateTime, [metaField]: "bbb"}, + {_id: 4, [timeField]: dateTime, [metaField]: "bbb", f: "F"}, + {_id: 5, [timeField]: dateTime, [metaField]: "bbb"} + ], + n: 6, + pathToMetaFieldBeingUpdated: "", + }); +} + +function testCaseCollationUpdates({testUpdate}) { + const collationDoc1 = {_id: 1, [timeField]: dateTime, [metaField]: "café"}; + const collationDoc2 = {_id: 2, [timeField]: dateTime, [metaField]: "cafe"}; + const collationDoc3 = {_id: 3, [timeField]: dateTime, [metaField]: "cafE"}; + const initialDocList = [collationDoc1, collationDoc2, collationDoc3]; + + // Query on the metaField and modify the metaField using collation with strength level 1. + testUpdate({ + initialDocList, + updates: [{ + q: {[metaField]: "cafe"}, + u: {$set: {[metaField]: "Updated"}}, + multi: true, + collation: {locale: "fr", strength: 1}, + }], + resultDocList: [ + {_id: 1, [timeField]: dateTime, [metaField]: "Updated"}, + {_id: 2, [timeField]: dateTime, [metaField]: "Updated"}, + {_id: 3, [timeField]: dateTime, [metaField]: "Updated"} + ], + n: 3, + pathToMetaFieldBeingUpdated: "", + }); + + // Query on the metaField and modify the metaField using collation with the default strength + // (level 3). + testUpdate({ + initialDocList, + updates: [{ + q: {[metaField]: "cafe"}, + u: {$set: {[metaField]: "Updated"}}, + multi: true, + collation: {locale: "fr"}, + }], + resultDocList: [ + collationDoc1, + {_id: 2, [timeField]: dateTime, [metaField]: "Updated"}, + collationDoc3, + ], + n: 1, + pathToMetaFieldBeingUpdated: "", + }); +} + +function testCaseNullUpdates({testUpdate}) { + // Assumes shard key is meta.a. + const nullDoc = {_id: 1, [timeField]: dateTime, [metaField]: {a: null, b: 1}}; + const missingDoc1 = {_id: 2, [timeField]: dateTime, [metaField]: {b: 1}}; + const missingDoc2 = {_id: 3, [timeField]: dateTime, [metaField]: "foo"}; + const initialDocList = [nullDoc, missingDoc1, missingDoc2]; + + // Query on the metaField and modify the metaField using collation with strength level 1. + testUpdate({ + initialDocList, + updates: [{ + q: {[metaField]: {$ne: null}}, + u: {$set: {[metaField]: "Updated"}}, + multi: true, + }], + resultDocList: [ + {_id: 1, [timeField]: dateTime, [metaField]: "Updated"}, + {_id: 2, [timeField]: dateTime, [metaField]: "Updated"}, + {_id: 3, [timeField]: dateTime, [metaField]: "Updated"}, + ], + n: 3, + }); +} + +// Run tests with a variety of shardKeys and timeseries configurations. +const timeseriesOptions = { + timeField, + metaField +}; +const tests = [ + testCaseMultiFalseUpdateFails, + testCaseNoMetaFieldQueryUpdateFails, + testCaseIllegalMetaFieldUpdateFails, + testCaseReplacementAndPipelineUpdateFails, + testCaseCollationUpdates, + testCaseUpdateWithLetDoc, + testCaseBatchUpdates, + testCaseValidMetaFieldUpdates, +]; +testUpdates({shardKeyTimeField: timeField, timeseriesOptions: {timeField}, tests}); +testUpdates({shardKeyMetaFieldPath: metaField, timeseriesOptions, tests}); +testUpdates( + {shardKeyTimeField: timeField, shardKeyMetaFieldPath: metaField, timeseriesOptions, tests}); + +// Run a relevant subset of tests in the case when meta.a is the shard key. +const testsForMetaSubfieldShardKey = [ + testCaseNullUpdates, + testCaseMultiFalseUpdateFails, + testCaseNoMetaFieldQueryUpdateFails, + testCaseIllegalMetaFieldUpdateFails, + testCaseReplacementAndPipelineUpdateFails, + testCaseValidMetaFieldUpdates, +]; +testUpdates({ + shardKeyMetaFieldPath: metaField + ".a", + timeseriesOptions, + tests: testsForMetaSubfieldShardKey, +}); +testUpdates({ + shardKeyMetaFieldPath: metaField + ".a", + shardKeyTimeField: timeField, + timeseriesOptions, + tests: testsForMetaSubfieldShardKey, +}); + +st.stop(); +})(); diff --git a/jstests/sharding/timeseries_update_routing.js b/jstests/sharding/timeseries_update_routing.js new file mode 100644 index 00000000000..49e33faeea2 --- /dev/null +++ b/jstests/sharding/timeseries_update_routing.js @@ -0,0 +1,206 @@ +/** + * Test routing of updates into sharded timeseries collection. + * + * @tags: [ + * requires_fcv_51, + * requires_find_command, + * ] + */ + +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest' helpers. + +const st = new ShardingTest({shards: 2, rs: {nodes: 2}}); + +// +// Constants used throughout all tests. +// + +const dbName = 'testDB'; +const collName = 'weather'; +const bucketCollName = `system.buckets.${collName}`; +const bucketCollFullName = `${dbName}.${bucketCollName}`; + +// +// Checks for feature flags. +// + +if (!TimeseriesTest.timeseriesCollectionsEnabled(st.shard0)) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + st.stop(); + return; +} + +if (!TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { + jsTestLog("Skipping test because the sharded time-series collection feature flag is disabled"); + st.stop(); + return; +} + +if (!TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(st.shard0)) { + jsTestLog( + "Skipping test because the updates and deletes on time-series collection feature flag is disabled"); + st.stop(); + return; +} + +if (!TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(st.shard0)) { + jsTestLog( + "Skipping test because the updates and deletes on sharded time-series collection feature flag is disabled"); + st.stop(); + return; +} + +const mongos = st.s; +const testDB = mongos.getDB(dbName); +const primary = st.shard0; +const primaryDB = primary.getDB(dbName); +const otherShard = st.shard1; +const otherShardDB = otherShard.getDB(dbName); + +testDB.dropDatabase(); +assert.commandWorked(mongos.adminCommand({enableSharding: dbName})); +st.ensurePrimaryShard(testDB.getName(), primary.shardName); + +assert.commandWorked(testDB.createCollection( + collName, {timeseries: {timeField: "time", metaField: "location", granularity: "hours"}})); + +const testColl = testDB[collName]; + +// +// Helper functions +// + +function testUpdateRouting({updates, nModified, shardsTargetedCount}) { + // Restart profiling. + for (const db of [primaryDB, otherShardDB]) { + db.setProfilingLevel(0); + db.system.profile.drop(); + db.setProfilingLevel(2); + } + + // Verify output documents. + const updateCommand = {update: testColl.getName(), updates}; + const result = assert.commandWorked(testDB.runCommand(updateCommand)); + + assert.eq(nModified, result.nModified); + + // Verify profiling output. + if (shardsTargetedCount > 0) { + let filter = {"op": "update", "ns": "testDB.weather"}; + let actualCount = 0; + for (const db of [primaryDB, otherShardDB]) { + const expectedEntries = db.system.profile.find(filter).toArray(); + actualCount += expectedEntries.length; + } + assert.eq(actualCount, shardsTargetedCount); + } +} + +(function setUpTestColl() { + assert.commandWorked(testDB.adminCommand( + {shardCollection: testColl.getFullName(), key: {"location.city": 1, time: 1}})); + + const data = [ + // Cork. + { + location: {city: "Cork", coordinates: [-12, 10]}, + time: ISODate("2021-05-18T08:00:00.000Z"), + temperature: 12, + }, + { + location: {city: "Cork", coordinates: [0, 0]}, + time: ISODate("2021-05-18T07:30:00.000Z"), + temperature: 15, + }, + // Dublin. + { + location: {city: "Dublin", coordinates: [25, -43]}, + time: ISODate("2021-05-18T08:00:00.000Z"), + temperature: 12, + }, + { + location: {city: "Dublin", coordinates: [0, 0]}, + time: ISODate("2021-05-18T08:00:00.000Z"), + temperature: 22, + }, + { + location: {city: "Dublin", coordinates: [25, -43]}, + time: ISODate("2021-05-18T08:30:00.000Z"), + temperature: 12.5, + }, + { + location: {city: "Dublin", coordinates: [25, -43]}, + time: ISODate("2021-05-18T09:00:00.000Z"), + temperature: 13, + }, + // Galway. + { + location: {city: "Galway", coordinates: [22, 44]}, + time: ISODate("2021-05-18T08:00:00.000Z"), + temperature: 20, + }, + { + location: {city: "Galway", coordinates: [0, 0]}, + time: ISODate("2021-05-18T09:00:00.000Z"), + temperature: 20, + }, + ]; + assert.commandWorked(testColl.insertMany(data)); +})(); + +(function defineChunks() { + function splitAndMove(city, minTime, destination) { + assert.commandWorked(st.s.adminCommand( + {split: bucketCollFullName, middle: {"meta.city": city, 'control.min.time': minTime}})); + assert.commandWorked(st.s.adminCommand({ + movechunk: bucketCollFullName, + find: {"meta.city": city, 'control.min.time': minTime}, + to: destination.shardName, + _waitForDelete: true + })); + } + + // Place the Dublin buckets on the primary and split the other buckets across both shards. + splitAndMove("Galway", ISODate("2021-05-18T08:00:00.000Z"), otherShard); + splitAndMove("Dublin", MinKey, primary); + splitAndMove("Cork", ISODate("2021-05-18T09:00:00.000Z"), otherShard); +})(); + +// All Dublin documents exist on the primary, so we should only target the one shard. +testUpdateRouting({ + updates: [{ + q: {"location.city": "Dublin"}, + u: {$set: {"location.coordinates": [123, -123]}}, + multi: true, + }], + nModified: 4, + shardsTargetedCount: 1 +}); + +// Galway documents exist on both shards, so we should target both. +testUpdateRouting({ + updates: [{ + q: {"location.city": "Galway"}, + u: {$set: {"location.coordinates": [6, 7]}}, + multi: true, + }], + nModified: 2, + shardsTargetedCount: 2 +}); + +// All shards need to be targeted for an empty query. +testUpdateRouting({ + updates: [{ + q: {}, + u: {$set: {"location.coordinates": [222, 111]}}, + multi: true, + }], + nModified: 8, + shardsTargetedCount: 2 +}); + +st.stop(); +})(); diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index adce5e0ac34..eaa1a6aea7e 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -958,16 +958,21 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry( WriteResult performUpdates(OperationContext* opCtx, const write_ops::UpdateCommandRequest& wholeOp, OperationSource source) { + auto ns = wholeOp.getNamespace(); + if (source == OperationSource::kTimeseriesUpdate && !ns.isTimeseriesBucketsCollection()) { + ns = ns.makeTimeseriesBucketsNamespace(); + } + // Update performs its own retries, so we should not be in a WriteUnitOfWork unless run in a // transaction. auto txnParticipant = TransactionParticipant::get(opCtx); invariant(!opCtx->lockState()->inAWriteUnitOfWork() || (txnParticipant && opCtx->inMultiDocumentTransaction())); - uassertStatusOK(userAllowedWriteNS(opCtx, wholeOp.getNamespace())); + uassertStatusOK(userAllowedWriteNS(opCtx, ns)); DisableDocumentSchemaValidationIfTrue docSchemaValidationDisabler( opCtx, wholeOp.getWriteCommandRequestBase().getBypassDocumentValidation()); - LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace()); + LastOpFixer lastOpFixer(opCtx, ns); bool containsRetry = false; ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); }); @@ -1020,23 +1025,11 @@ WriteResult performUpdates(OperationContext* opCtx, : std::vector<StmtId>{stmtId}; out.results.emplace_back(performSingleUpdateOpWithDupKeyRetry( - opCtx, - source == OperationSource::kTimeseriesUpdate - ? wholeOp.getNamespace().makeTimeseriesBucketsNamespace() - : wholeOp.getNamespace(), - stmtIds, - singleOp, - runtimeConstants, - wholeOp.getLet(), - source)); + opCtx, ns, stmtIds, singleOp, runtimeConstants, wholeOp.getLet(), source)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { - out.canContinue = handleError(opCtx, - ex, - wholeOp.getNamespace(), - wholeOp.getWriteCommandRequestBase(), - singleOp.getMulti(), - &out); + out.canContinue = handleError( + opCtx, ex, ns, wholeOp.getWriteCommandRequestBase(), singleOp.getMulti(), &out); if (!out.canContinue) { break; } diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index 0e61716d42d..f87abb745a9 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -45,6 +45,7 @@ #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_options.h" +#include "mongo/db/timeseries/timeseries_update_delete_util.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" #include "mongo/s/client/shard_registry.h" @@ -128,12 +129,11 @@ UpdateType getUpdateExprType(const write_ops::UpdateOpEntry& updateDoc) { BSONObj getUpdateExprForTargeting(const boost::intrusive_ptr<ExpressionContext> expCtx, const ShardKeyPattern& shardKeyPattern, UpdateType updateType, - const write_ops::UpdateOpEntry& updateOp) { + const BSONObj& updateQuery, + const write_ops::UpdateModification& updateMod) { // We should never see an invalid update type here. invariant(updateType != UpdateType::kUnknown); - const auto& updateMod = updateOp.getU(); - // If this is not a replacement update, then the update expression remains unchanged. if (updateType != UpdateType::kReplacement) { BSONObjBuilder objBuilder; @@ -157,7 +157,7 @@ BSONObj getUpdateExprForTargeting(const boost::intrusive_ptr<ExpressionContext> // This will guarantee that we can target a single shard, but it is not necessarily fatal if no // exact _id can be found. const auto idFromQuery = - uassertStatusOK(kVirtualIdShardKey.extractShardKeyFromQuery(expCtx, updateOp.getQ())); + uassertStatusOK(kVirtualIdShardKey.extractShardKeyFromQuery(expCtx, updateQuery)); if (auto idElt = idFromQuery[kIdFieldName]) { updateExpr = updateExpr.addField(idElt); } @@ -391,8 +391,8 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* // as if the the shard key values are specified as NULL. A replacement document is also allowed // to have a missing '_id', and if the '_id' exists in the query, it will be emplaced in the // replacement document for targeting purposes. + const auto& updateOp = itemRef.getUpdate(); - const auto updateType = getUpdateExprType(updateOp); // If the collection is not sharded, forward the update to the primary shard. if (!_cm.isSharded()) { @@ -414,10 +414,38 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* itemRef.getLet(), itemRef.getLegacyRuntimeConstants()); - const auto updateExpr = - getUpdateExprForTargeting(expCtx, shardKeyPattern, updateType, updateOp); const bool isUpsert = updateOp.getUpsert(); - const auto query = updateOp.getQ(); + auto query = updateOp.getQ(); + + if (_isRequestOnTimeseriesViewNamespace) { + uassert(ErrorCodes::NotImplemented, + str::stream() << "Updates are disallowed on sharded timeseries collections.", + feature_flags::gFeatureFlagShardedTimeSeriesUpdateDelete.isEnabledAndIgnoreFCV()); + uassert(ErrorCodes::InvalidOptions, + str::stream() + << "A {multi:false} update on a sharded timeseries collection is disallowed.", + updateOp.getMulti()); + uassert(ErrorCodes::InvalidOptions, + str::stream() + << "An {upsert:true} update on a sharded timeseries collection is disallowed.", + !isUpsert); + + // Since this is a timeseries query, we may need to rename the metaField. + if (auto metaField = _cm.getTimeseriesFields().get().getMetaField()) { + query = timeseries::translateQuery(query, *metaField); + } else { + // We want to avoid targeting the query incorrectly if no metaField is defined on the + // timeseries collection, since we only allow queries on the metaField for timeseries + // updates. Note: any non-empty query should fail to update once it reaches the shards + // because there is no metaField for it to query for, but we don't want to validate this + // during routing. + query = BSONObj(); + } + } + + const auto updateType = getUpdateExprType(updateOp); + const auto updateExpr = + getUpdateExprForTargeting(expCtx, shardKeyPattern, updateType, query, updateOp.getU()); // Utility function to target an update by shard key, and to handle any potential error results. auto targetByShardKey = [this, &collation](StatusWith<BSONObj> swShardKey, std::string msg) { diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h index 0aef3ecbf52..5bee40f821f 100644 --- a/src/mongo/s/chunk_manager_targeter.h +++ b/src/mongo/s/chunk_manager_targeter.h @@ -150,7 +150,8 @@ private: // Full namespace of the collection for this targeter NamespaceString _nss; - // Used to identify the original namespace that the user has requested. + // Used to identify the original namespace that the user has requested. Note: this will only be + // true if the buckets namespace is sharded. bool _isRequestOnTimeseriesViewNamespace = false; // Stores last error occurred |