summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlya Berciu <alyacarina@gmail.com>2021-10-04 11:55:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-04 12:28:32 +0000
commit80c8f9c04e2890a65d0f3db64d7b9d9b1e44db73 (patch)
tree4b623aa347c847fd79767aa055e2b6925a6da8f5
parentf4aa2b0e25976facd7bda02872e2a46205975dcc (diff)
downloadmongo-80c8f9c04e2890a65d0f3db64d7b9d9b1e44db73.tar.gz
SERVER-59180 Implement update rewrite and routing for sharded timeseries collections
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_kill_primary.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_terminate_primary.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml1
-rw-r--r--jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js30
-rw-r--r--jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js79
-rw-r--r--jstests/core/timeseries/libs/timeseries.js10
-rw-r--r--jstests/core/timeseries/timeseries_insert_after_update.js15
-rw-r--r--jstests/core/timeseries/timeseries_update.js15
-rw-r--r--jstests/core/timeseries/timeseries_update_concurrent.js2
-rw-r--r--jstests/core/timeseries/timeseries_update_hint.js4
-rw-r--r--jstests/sharding/timeseries_multiple_mongos.js94
-rw-r--r--jstests/sharding/timeseries_update.js970
-rw-r--r--jstests/sharding/timeseries_update_routing.js206
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp27
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp44
-rw-r--r--src/mongo/s/chunk_manager_targeter.h3
16 files changed, 1444 insertions, 58 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_kill_primary.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_kill_primary.yml
index 77c6f236701..ec618e5d03f 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_kill_primary.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_kill_primary.yml
@@ -159,6 +159,7 @@ selector:
- jstests/concurrency/fsm_workloads/collmod_separate_collections.js
- jstests/concurrency/fsm_workloads/invalidated_cursors.js
- jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js
+ - jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js
- jstests/concurrency/fsm_workloads/view_catalog.js
- jstests/concurrency/fsm_workloads/view_catalog_cycle_with_drop.js
- jstests/concurrency/fsm_workloads/view_catalog_direct_system_writes.js
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_terminate_primary.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_terminate_primary.yml
index 2ba95613343..0fe5a99e65a 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_terminate_primary.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_terminate_primary.yml
@@ -159,6 +159,7 @@ selector:
- jstests/concurrency/fsm_workloads/collmod_separate_collections.js
- jstests/concurrency/fsm_workloads/invalidated_cursors.js
- jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js
+ - jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js
- jstests/concurrency/fsm_workloads/view_catalog.js
- jstests/concurrency/fsm_workloads/view_catalog_cycle_with_drop.js
- jstests/concurrency/fsm_workloads/view_catalog_direct_system_writes.js
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml
index b2de0ac89c4..5b3e6b6bd15 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml
@@ -142,6 +142,7 @@ selector:
- jstests/concurrency/fsm_workloads/collmod_writeconflict.js
- jstests/concurrency/fsm_workloads/invalidated_cursors.js
- jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js
+ - jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js
- jstests/concurrency/fsm_workloads/view_catalog.js
- jstests/concurrency/fsm_workloads/view_catalog_cycle_with_drop.js
- jstests/concurrency/fsm_workloads/view_catalog_direct_system_writes.js
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js b/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js
index 4b8e496dd05..fa052f3c730 100644
--- a/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js
+++ b/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js
@@ -34,6 +34,17 @@ var $config = extendWorkload($config, function($config, $super) {
$config.data.bucketPrefix = "system.buckets.";
+ $config.data.timeField = 't';
+ $config.data.metaField = 'm';
+
+ $config.data.generateMetaFieldValueForInitialInserts = () => {
+ return Math.floor(Random.rand() * $config.data.numMetaCount);
+ };
+
+ $config.data.generateMetaFieldValueForInsertStage = (i) => {
+ return i % $config.data.numMetaCount;
+ };
+
$config.threadCount = 10;
$config.iterations = 40;
$config.startState = "init";
@@ -49,8 +60,8 @@ var $config = extendWorkload($config, function($config, $super) {
this.startTime + Math.floor(Random.rand() * this.numInitialDocs * this.increment);
const doc = {
_id: new ObjectId(),
- t: new Date(timer),
- m: Math.floor(Random.rand() * this.numMetaCount)
+ [this.timeField]: new Date(timer),
+ [this.metaField]: this.generateMetaFieldValueForInsertStage(this.tid),
};
assertAlways.commandWorked(db[collName].insert(doc));
assertAlways.commandWorked(db[this.nonShardCollName].insert(doc));
@@ -121,8 +132,11 @@ var $config = extendWorkload($config, function($config, $super) {
// buckets with all documents.
const verifyBucketIndex = (bucketIndex) => {
const unpackStage = {
- "$_internalUnpackBucket":
- {"timeField": "t", "metaField": "m", "bucketMaxSpanSeconds": NumberInt(3600)}
+ "$_internalUnpackBucket": {
+ "timeField": this.timeField,
+ "metaField": this.metaField,
+ "bucketMaxSpanSeconds": NumberInt(3600)
+ }
};
const bucketColl = db.getCollection(`system.buckets.${collName}`);
const numDocsInBuckets =
@@ -151,8 +165,8 @@ var $config = extendWorkload($config, function($config, $super) {
db[collName].drop();
db[this.nonShardCollName].drop();
- assertAlways.commandWorked(
- db.createCollection(collName, {timeseries: {timeField: "t", metaField: "m"}}));
+ assertAlways.commandWorked(db.createCollection(
+ collName, {timeseries: {timeField: this.timeField, metaField: this.metaField}}));
cluster.shardCollection(db[collName], {t: 1}, false);
// Create indexes to verify index integrity during the teardown state.
@@ -171,8 +185,8 @@ var $config = extendWorkload($config, function($config, $super) {
const doc = {
_id: new ObjectId(),
- t: new Date(currentTimeStamp),
- m: i % this.numMetaCount
+ [this.timeField]: new Date(currentTimeStamp),
+ [this.metaField]: this.generateMetaFieldValueForInitialInserts(i),
};
bulk.insert(doc);
bulkUnsharded.insert(doc);
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js b/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js
new file mode 100644
index 00000000000..e700d90f095
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_updates.js
@@ -0,0 +1,79 @@
+/**
+ * Tests the updates into sharded time-series collection during a chunk migration. To ensure the
+ * correctness, the test does the same writes into an unsharded collection and verifies that the
+ * number of documents remain the same at the end. This test also checks that indexes on the
+ * time-series buckets collection remain consistent after the test run.
+ * @tags: [
+ * requires_sharding,
+ * assumes_balancer_off,
+ * requires_non_retryable_writes,
+ * ]
+ */
+'use strict';
+
+const numValues = 10;
+
+load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest' helpers.
+load('jstests/concurrency/fsm_workloads/random_moveChunk_timeseries_inserts.js');
+
+var $config = extendWorkload($config, function($config, $super) {
+ $config.states.init = function(db, collName, connCache) {
+ if (TimeseriesTest.timeseriesCollectionsEnabled(db) &&
+ TimeseriesTest.shardedtimeseriesCollectionsEnabled(db) &&
+ TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(db) &&
+ TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(db)) {
+ this.featureFlagDisabled = false;
+ } else {
+ jsTestLog(
+ "Skipping executing this test as the requisite feature flags are not enabled.");
+ }
+
+ $super.states.init(db, collName);
+ };
+
+ $config.data.generateMetaFieldValueForInitialInserts = () => {
+ let meta = {};
+ // Insert a document with a field for every thread to test concurrent updates on the
+ // same document.
+ for (let i = 0; i < $config.threadCount; i++) {
+ meta["tid" + i] = Random.randInt(numValues);
+ }
+ return meta;
+ };
+
+ $config.data.generateMetaFieldValueForInsertStage = (tid) => {
+ return {["tid" + tid]: Random.randInt(numValues)};
+ };
+
+ $config.states.update = function(db, collName, connCache) {
+ if (this.featureFlagDisabled) {
+ return;
+ }
+
+ const shardedColl = db[collName];
+ const unshardedColl = db[this.nonShardCollName];
+ const updateField = "tid" + this.tid;
+ const oldValue = Random.randInt(numValues);
+
+ // Updates some measurements along the field owned by this thread in both sharded and
+ // unsharded ts collections.
+ jsTestLog("Executing update state on: " + collName + " on field " + updateField);
+ assertAlways.commandWorked(
+ shardedColl.update({[this.metaField]: {[updateField]: {$gte: oldValue}}},
+ {$inc: {[this.metaField + "." + updateField]: 1}},
+ {multi: true}));
+ assertAlways.commandWorked(
+ unshardedColl.update({[this.metaField]: {[updateField]: {$gte: oldValue}}},
+ {$inc: {[this.metaField + "." + updateField]: 1}},
+ {multi: true}));
+ };
+
+ $config.transitions = {
+ init: {insert: 1},
+ insert: {insert: 0.4, moveChunk: 0.1, update: 0.5},
+ update: {insert: 0.5, moveChunk: 0.1, update: 0.4},
+ moveChunk: {insert: 0.4, moveChunk: 0.1, update: 0.5},
+ };
+
+ return $config;
+});
diff --git a/jstests/core/timeseries/libs/timeseries.js b/jstests/core/timeseries/libs/timeseries.js
index a226803058a..10e2d8b90aa 100644
--- a/jstests/core/timeseries/libs/timeseries.js
+++ b/jstests/core/timeseries/libs/timeseries.js
@@ -29,6 +29,16 @@ var TimeseriesTest = class {
.featureFlagTimeseriesUpdatesAndDeletes.value;
}
+ /**
+ * Returns whether sharded time-series updates and deletes are supported.
+ */
+ static shardedTimeseriesUpdatesAndDeletesEnabled(conn) {
+ return assert
+ .commandWorked(
+ conn.adminCommand({getParameter: 1, featureFlagShardedTimeSeriesUpdateDelete: 1}))
+ .featureFlagShardedTimeSeriesUpdateDelete.value;
+ }
+
static shardedtimeseriesCollectionsEnabled(conn) {
return assert
.commandWorked(conn.adminCommand({getParameter: 1, featureFlagShardedTimeSeries: 1}))
diff --git a/jstests/core/timeseries/timeseries_insert_after_update.js b/jstests/core/timeseries/timeseries_insert_after_update.js
index 37e1df137a3..eec6005720e 100644
--- a/jstests/core/timeseries/timeseries_insert_after_update.js
+++ b/jstests/core/timeseries/timeseries_insert_after_update.js
@@ -3,7 +3,6 @@
* were updated.
*
* @tags: [
- * assumes_unsharded_collection, # TODO SERVER-59180: Remove this tag.
* does_not_support_stepdowns,
* does_not_support_transactions,
* requires_getmore,
@@ -14,12 +13,26 @@
"use strict";
load("jstests/core/timeseries/libs/timeseries.js");
+load("jstests/libs/fixture_helpers.js");
if (!TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(db.getMongo())) {
jsTestLog("Skipping test because the time-series updates and deletes feature flag is disabled");
return;
}
+if (FixtureHelpers.isMongos(db) &&
+ !TimeseriesTest.shardedtimeseriesCollectionsEnabled(db.getMongo())) {
+ jsTestLog("Skipping test because the time-series updates and deletes feature flag is disabled");
+ return;
+}
+
+if (FixtureHelpers.isMongos(db) &&
+ !TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(db.getMongo())) {
+ jsTestLog(
+ "Skipping test because the sharded time-series updates and deletes feature flag is disabled");
+ return;
+}
+
TimeseriesTest.run((insert) => {
const testDB = db.getSiblingDB(jsTestName());
assert.commandWorked(testDB.dropDatabase());
diff --git a/jstests/core/timeseries/timeseries_update.js b/jstests/core/timeseries/timeseries_update.js
index ff5c8e6b7d6..82ae774089a 100644
--- a/jstests/core/timeseries/timeseries_update.js
+++ b/jstests/core/timeseries/timeseries_update.js
@@ -1,7 +1,6 @@
/**
* Tests running the update command on a time-series collection.
* @tags: [
- * assumes_unsharded_collection, # TODO SERVER-59180: Remove this tag.
* does_not_support_stepdowns,
* does_not_support_transactions,
* requires_getmore,
@@ -12,12 +11,26 @@
"use strict";
load("jstests/core/timeseries/libs/timeseries.js");
+load("jstests/libs/fixture_helpers.js");
if (!TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(db.getMongo())) {
jsTestLog("Skipping test because the time-series updates and deletes feature flag is disabled");
return;
}
+if (FixtureHelpers.isMongos(db) &&
+ !TimeseriesTest.shardedtimeseriesCollectionsEnabled(db.getMongo())) {
+ jsTestLog("Skipping test because the sharded time-series feature flag is disabled");
+ return;
+}
+
+if (FixtureHelpers.isMongos(db) &&
+ !TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(db.getMongo())) {
+ jsTestLog(
+ "Skipping test because the sharded time-series updates and deletes feature flag is disabled");
+ return;
+}
+
const timeFieldName = "time";
const metaFieldName = "tag";
const dateTime = ISODate("2021-07-12T16:00:00Z");
diff --git a/jstests/core/timeseries/timeseries_update_concurrent.js b/jstests/core/timeseries/timeseries_update_concurrent.js
index fdea47f4683..6ca2745b5f5 100644
--- a/jstests/core/timeseries/timeseries_update_concurrent.js
+++ b/jstests/core/timeseries/timeseries_update_concurrent.js
@@ -2,7 +2,7 @@
* Tests running the update command on a time-series collection with concurrent modifications to the
* collection.
* @tags: [
- * assumes_unsharded_collection, # TODO SERVER-59180: Remove this tag.
+ * assumes_unsharded_collection, # TODO SERVER-60233: Remove this tag.
* does_not_support_stepdowns,
* does_not_support_transactions,
* requires_fcv_51,
diff --git a/jstests/core/timeseries/timeseries_update_hint.js b/jstests/core/timeseries/timeseries_update_hint.js
index b658bd13539..9dbc145a66f 100644
--- a/jstests/core/timeseries/timeseries_update_hint.js
+++ b/jstests/core/timeseries/timeseries_update_hint.js
@@ -1,7 +1,7 @@
/**
* Tests passing a hint to the update command on a time-series collection.
* @tags: [
- * assumes_unsharded_collection, # TODO SERVER-59180: Remove this tag.
+ * assumes_unsharded_collection, # TODO SERVER-60233: Remove this tag.
* does_not_support_stepdowns,
* does_not_support_transactions,
* requires_fcv_51,
@@ -318,4 +318,4 @@ testUpdateHintFailed({
nModifiedBuckets: 0,
failCode: ErrorCodes.BadValue,
});
-})(); \ No newline at end of file
+})();
diff --git a/jstests/sharding/timeseries_multiple_mongos.js b/jstests/sharding/timeseries_multiple_mongos.js
index 4a0d38a3254..bc34225099f 100644
--- a/jstests/sharding/timeseries_multiple_mongos.js
+++ b/jstests/sharding/timeseries_multiple_mongos.js
@@ -57,6 +57,9 @@ function generateBatch(size) {
*/
function runTest({shardKey, cmdObj, numProfilerEntries}) {
const isDelete = cmdObj["delete"] !== undefined;
+ const isUpdate = cmdObj["update"] !== undefined;
+ const cmdCollName = cmdObj[Object.keys(cmdObj)[0]];
+ const shardKeyHasMetaField = shardKey[metaField] !== undefined;
// Insert some dummy data using 'mongos1' as the router, so that the cache is initialized on the
// mongos while the collection is unsharded.
@@ -71,21 +74,22 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) {
key: shardKey,
}));
- assert.commandWorked(
- mongos0.adminCommand({split: `${dbName}.system.buckets.${collName}`, middle: {meta: 1}}));
+ // Move one of the chunks into the second shard. Note that we can only do this if the meta field
+ // is part of the shard key.
+ const middle = shardKeyHasMetaField ? {meta: 1} : {"meta.a": 1};
+ assert.commandWorked(mongos0.adminCommand({split: `${dbName}.${bucketsCollName}`, middle}));
- // Move one of the chunks into the second shard.
const primaryShard = st.getPrimaryShard(dbName);
const otherShard = st.getOther(primaryShard);
assert.commandWorked(mongos0.adminCommand({
- movechunk: `${dbName}.system.buckets.${collName}`,
- find: {meta: 1},
+ movechunk: `${dbName}.${bucketsCollName}`,
+ find: middle,
to: otherShard.name,
_waitForDelete: true
}));
// Validate the command by running against 'mongos1' as the router.
- function validateCommand(collName, numEntries, unVersioned) {
+ function validateCommand(cmdCollName, numEntries, unVersioned) {
// Restart profiler.
for (let shardDB of [shard0DB, shard1DB]) {
shardDB.setProfilingLevel(0);
@@ -101,9 +105,13 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) {
}
const queryField = `command.${Object.keys(cmdObj)[0]}`;
- let filter = {[queryField]: collName, "command.shardVersion.0": {$ne: Timestamp(0, 0)}};
- if (isDelete) {
- filter = {"op": "remove", "ns": `${dbName}.${collName}`, "ok": {$ne: 0}};
+ let filter = {[queryField]: cmdCollName, "command.shardVersion.0": {$ne: Timestamp(0, 0)}};
+
+ // We currently do not log 'shardVersion' for updates. See SERVER-60354 for details.
+ if (isUpdate) {
+ filter = {"op": "update", "ns": `${dbName}.${cmdCollName}`, "ok": {$ne: 0}};
+ } else if (isDelete) {
+ filter = {"op": "remove", "ns": `${dbName}.${cmdCollName}`, "ok": {$ne: 0}};
} else if (unVersioned) {
filter["command.shardVersion.0"] = Timestamp(0, 0);
}
@@ -115,11 +123,9 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) {
{shard0Entries: shard0Entries, shard1Entries: shard1Entries});
}
- let targetShardedCollection = bucketsCollName;
- if (isDelete && cmdObj["delete"] !== bucketsCollName) {
- targetShardedCollection = collName;
- }
- validateCommand(targetShardedCollection, numProfilerEntries.sharded);
+ // The update command is always logged as being on the user-provided namespace.
+ validateCommand((isUpdate || isDelete) ? cmdCollName : bucketsCollName,
+ numProfilerEntries.sharded);
// Insert dummy data so that the 'mongos1' sees the collection as sharded.
assert.commandWorked(mongos1.getCollection(collName).insert({[timeField]: ISODate()}));
@@ -130,9 +136,7 @@ function runTest({shardKey, cmdObj, numProfilerEntries}) {
collName, {timeseries: {timeField: timeField, metaField: metaField}}));
// When unsharded, the command should be run against the user requested namespace.
- validateCommand(cmdObj[Object.keys(cmdObj)[0]] /* coll name specified in the command */,
- numProfilerEntries.unsharded,
- true);
+ validateCommand(cmdCollName, numProfilerEntries.unsharded, true);
}
/**
@@ -248,6 +252,60 @@ runTest({
if (TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(st.shard0) &&
TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(st.shard0)) {
+ // Tests for updates.
+ runTest({
+ shardKey: {[metaField + ".a"]: 1},
+ cmdObj: {
+ update: collName,
+ updates: [{
+ q: {},
+ u: {$inc: {[metaField + ".b"]: 1}},
+ multi: true,
+ }]
+ },
+ numProfilerEntries: {sharded: 2, unsharded: 1},
+ });
+
+ runTest({
+ shardKey: {[metaField + ".a"]: 1},
+ cmdObj: {
+ update: collName,
+ updates: [{
+ q: {[metaField + ".a"]: 1},
+ u: {$inc: {[metaField + ".b"]: -1}},
+ multi: true,
+ }]
+ },
+ numProfilerEntries: {sharded: 1, unsharded: 1},
+ });
+
+ runTest({
+ shardKey: {[metaField + ".a"]: 1},
+ cmdObj: {
+ update: bucketsCollName,
+ updates: [{
+ q: {},
+ u: {$inc: {["meta.b"]: 1}},
+ multi: true,
+ }]
+ },
+ numProfilerEntries: {sharded: 2, unsharded: 1},
+ });
+
+ runTest({
+ shardKey: {[metaField + ".a"]: 1},
+ cmdObj: {
+ update: bucketsCollName,
+ updates: [{
+ q: {["meta.a"]: 1},
+ u: {$inc: {["meta.b"]: -1}},
+ multi: true,
+ }]
+ },
+ numProfilerEntries: {sharded: 1, unsharded: 1},
+ });
+
+ // Tests for deletes.
runTest({
shardKey: {[metaField]: 1},
cmdObj: {
@@ -295,8 +353,6 @@ if (TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(st.shard0) &&
},
numProfilerEntries: {sharded: 1, unsharded: 1},
});
-
- // TODO SERVER-59180: Add tests for updates.
}
st.stop();
diff --git a/jstests/sharding/timeseries_update.js b/jstests/sharding/timeseries_update.js
new file mode 100644
index 00000000000..379da48c672
--- /dev/null
+++ b/jstests/sharding/timeseries_update.js
@@ -0,0 +1,970 @@
+/**
+ * Test updates into sharded timeseries collection.
+ *
+ * @tags: [
+ * requires_fcv_51,
+ * requires_find_command,
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest' helpers.
+
+const st = new ShardingTest({shards: 2, rs: {nodes: 2}});
+const mongos = st.s0;
+
+//
+// Constants used throughout all tests.
+//
+
+const dbName = 'testDB';
+const collName = 'coll';
+const timeField = "time";
+const metaField = "tag";
+const dateTime = ISODate("2021-07-12T16:00:00Z");
+
+//
+// Checks for feature flags.
+//
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(st.shard0)) {
+ jsTestLog("Skipping test because the time-series collection feature flag is disabled");
+ st.stop();
+ return;
+}
+
+if (!TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) {
+ jsTestLog("Skipping test because the sharded time-series collection feature flag is disabled");
+ st.stop();
+ return;
+}
+
+if (!TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(st.shard0)) {
+ jsTestLog(
+ "Skipping test because the updates and deletes on time-series collection feature flag is disabled");
+ st.stop();
+ return;
+}
+
+const testDB = mongos.getDB(dbName);
+testDB.dropDatabase();
+assert.commandWorked(mongos.adminCommand({enableSharding: dbName}));
+
+if (!TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(st.shard0)) {
+ // Ensure that the feature flag correctly prevents us from running an update on a sharded
+ // timeseries collection.
+ assert.commandWorked(testDB.createCollection(collName, {timeseries: {timeField, metaField}}));
+ const coll = testDB.getCollection(collName);
+ assert.commandWorked(coll.createIndex({[timeField]: 1}));
+ assert.commandWorked(mongos.adminCommand({
+ shardCollection: `${dbName}.${collName}`,
+ key: {[timeField]: 1},
+ }));
+ assert.commandFailedWithCode(
+ testDB.runCommand(
+ {update: coll.getName(), updates: [{q: {}, u: {[metaField]: 1}, multi: true}]}),
+ [ErrorCodes.NotImplemented, ErrorCodes.InvalidOptions]);
+ st.stop();
+ return;
+}
+
+const doc1 = {
+ _id: 1,
+ [timeField]: dateTime,
+ [metaField]: {a: "A", b: "B"}
+};
+const doc2 = {
+ _id: 2,
+ [timeField]: dateTime,
+ [metaField]: {c: "C", d: 2},
+ f: [{"k": "K", "v": "V"}]
+};
+const doc3 = {
+ _id: 3,
+ [timeField]: dateTime,
+ f: "F"
+};
+const doc4 = {
+ _id: 4,
+ [timeField]: dateTime,
+ [metaField]: {a: "A", b: "B"},
+ f: "F"
+};
+const doc5 = {
+ _id: 5,
+ [timeField]: dateTime,
+ [metaField]: {a: "A", b: "B", c: "C"}
+};
+
+//
+// Helper functions.
+//
+
+/**
+ * Confirms that a set of updates returns the expected set of documents.
+ */
+function testShardedUpdate({
+ insert,
+ shardKey,
+ updatesMetaFieldInShardKey,
+ timeseries,
+ initialDocList,
+ updates,
+ resultDocList,
+ n,
+ nModified = n,
+ letDoc = {},
+ failCode,
+ ordered = true,
+}) {
+ assert.commandWorked(mongos.adminCommand({enableSharding: dbName}));
+ assert.commandWorked(testDB.createCollection(collName, {timeseries}));
+
+ const coll = testDB.getCollection(collName);
+ assert.commandWorked(coll.createIndex(shardKey));
+ assert.commandWorked(insert(coll, initialDocList));
+
+ assert.commandWorked(mongos.adminCommand({
+ shardCollection: `${dbName}.${collName}`,
+ key: shardKey,
+ }));
+
+ // Updates on timeseries collections are not allowed if no metaField is defined.
+ if (!timeseries["metaField"]) {
+ failCode = [ErrorCodes.InvalidOptions];
+ n = 0;
+ nModified = 0;
+ resultDocList = initialDocList;
+ }
+
+ // Updates on sharded timeseries meta fields are only allowed as long as the field updated is
+ // not in the shard key.
+ if (updatesMetaFieldInShardKey) {
+ failCode = [ErrorCodes.InvalidOptions, 31025];
+ n = 0;
+ nModified = 0;
+ resultDocList = initialDocList;
+ }
+
+ const updateCommand = {update: coll.getName(), updates, ordered, let : letDoc};
+ const res = failCode ? assert.commandFailedWithCode(testDB.runCommand(updateCommand), failCode)
+ : assert.commandWorked(testDB.runCommand(updateCommand));
+
+ assert.eq(n, res.n);
+ assert.eq(nModified, res.nModified);
+ assert.eq(initialDocList.length, resultDocList.length);
+
+ resultDocList.forEach(resultDoc => {
+ assert.docEq(
+ resultDoc,
+ coll.findOne({_id: resultDoc._id}),
+ "Expected document not found in result collection:" + tojson(coll.find().toArray()));
+ });
+
+ assert(coll.drop());
+}
+
+/**
+ * Wrapper on testShardedUpdate to compose tests with a given shardKey/timeseries combo.
+ */
+function testUpdates({shardKeyTimeField, shardKeyMetaFieldPath, timeseriesOptions, tests}) {
+ // Set up the shard key for the tests.
+ let shardKey = {};
+ if (shardKeyMetaFieldPath) {
+ shardKey[shardKeyMetaFieldPath] = 1;
+ }
+ if (shardKeyTimeField) {
+ shardKey[shardKeyTimeField] = 1;
+ }
+
+ // Run a series of updates.
+ TimeseriesTest.run((insert) => {
+ // Helper lambda which has the extensions field set to:
+ // - undefined if we are updating the meta field.
+ // - the name of a subfield of meta we are updating.
+ // - an array of names of subfields of meta we are updating.
+ const checkUpdatesMetaFieldInShardKey = (pathToMetaFieldBeingUpdated) => {
+ if ((pathToMetaFieldBeingUpdated === undefined) || !shardKeyMetaFieldPath) {
+ // If we do not have the meta field in the shard key, we are able to update it.
+ return false;
+ } else if ((shardKeyMetaFieldPath === metaField) ||
+ (pathToMetaFieldBeingUpdated === "")) {
+ // If the top-level meta field is in the shard key, we cannot update it.
+ return true;
+ } else if (!Array.isArray(pathToMetaFieldBeingUpdated)) {
+ pathToMetaFieldBeingUpdated = [pathToMetaFieldBeingUpdated];
+ }
+ for (const e of pathToMetaFieldBeingUpdated) {
+ if (metaField + "." + e === shardKeyMetaFieldPath) {
+ return true;
+ }
+ }
+ return false;
+ };
+
+ const testUpdate = (fields, additionalFields = {}) => {
+ let inputs = Object.assign({}, fields, additionalFields);
+ testShardedUpdate(Object.assign({}, inputs, {
+ shardKey,
+ insert,
+ timeseries: timeseriesOptions,
+ updatesMetaFieldInShardKey:
+ checkUpdatesMetaFieldInShardKey(inputs.pathToMetaFieldBeingUpdated)
+ }));
+ };
+
+ tests.forEach(test => test({testUpdate}));
+ }, testDB);
+}
+
+/**
+ * Helper function to generate the parameters to pass to 'testUpdates' when an update is expected to
+ * fail.
+ */
+function expectFailedUpdate(initialDocList) {
+ return {
+ initialDocList,
+ resultDocList: initialDocList,
+ n: 0,
+ failCode: ErrorCodes.InvalidOptions,
+ };
+}
+
+//
+// Test cases when the update command fails.
+//
+
+function testCaseMultiFalseUpdateFails({testUpdate}) {
+ testUpdate({updates: [{q: {[metaField]: {b: "B"}}, u: {$set: {[metaField]: {b: "C"}}}}]},
+ expectFailedUpdate([doc1]));
+}
+
+function testCaseReplacementAndPipelineUpdateFails({testUpdate}) {
+ const expectFailedUpdateDoc = expectFailedUpdate([doc2]);
+
+ // Replace a document to have no metaField, which should fail since updates with replacement
+ // documents are not supported.
+ testUpdate({
+ updates: [{
+ q: {[metaField]: {c: "C", d: 2}},
+ u: {f2: {e: "E", f: "F"}, f3: 7},
+ multi: true,
+ }]
+ },
+ expectFailedUpdateDoc);
+
+ // Replace a document with an empty document, which should fail since updates with replacement
+ // documents are not supported.
+ testUpdate({
+ updates: [{
+ q: {[metaField]: {c: "C", d: 2}},
+ u: {},
+ multi: true,
+ }]
+ },
+ expectFailedUpdateDoc);
+
+ // Modify the metaField, which should fail since pipeline-style updates are not supported.
+ testUpdate({
+ updates: [{
+ q: {},
+ u: [
+ {$addFields: {[metaField + ".c"]: "C", [metaField + ".e"]: "E"}},
+ {$unset: metaField + ".e"}
+ ],
+ multi: true,
+ }]
+ },
+ expectFailedUpdateDoc);
+}
+
+function testCaseNoMetaFieldQueryUpdateFails({testUpdate}) {
+ // Query on a field which is not the (nonexistent) metaField.
+ testUpdate({
+ updates: [{
+ q: {f: "F"},
+ u: {},
+ multi: true,
+ }]
+ },
+ expectFailedUpdate([doc3]));
+
+ // Query on all documents and update them to be empty documents.
+ testUpdate({
+ updates: [{
+ q: {},
+ u: {},
+ multi: true,
+ }]
+ },
+ expectFailedUpdate([doc3]));
+
+ // Query on all documents and update them to be non-empty documents.
+ testUpdate({
+ updates: [{
+ q: {},
+ u: {f: "FF"},
+ multi: true,
+ }]
+ },
+ expectFailedUpdate([doc3]));
+
+ // Query on a field that is not the metaField.
+ testUpdate({
+ updates: [{
+ q: {measurement: "cpu"},
+ u: {$set: {[metaField]: {c: "C"}}},
+ multi: true,
+ }]
+ },
+ expectFailedUpdate([doc1]));
+
+ // Query on the metaField and a field that is not the metaField.
+ testUpdate({
+ updates: [
+ {
+ q: {[metaField]: {a: "A", b: "B"}, measurement: "cpu"},
+ u: {$set: {[metaField]: {c: "C"}}},
+ multi: true,
+ },
+ ]
+ },
+ expectFailedUpdate([doc1]));
+
+ // Query on a field that is not the metaField using dot notation and modify the metaField.
+ testUpdate({
+ updates: [{
+ q: {"measurement.A": "cpu"},
+ u: {$set: {[metaField]: {c: "C"}}},
+ multi: true,
+ }]
+ },
+ expectFailedUpdate([doc1]));
+}
+
+function testCaseIllegalMetaFieldUpdateFails({testUpdate}) {
+ // Query on the metaField and modify a field that is not the metaField.
+ testUpdate({
+ updates: [{
+ q: {[metaField]: {c: "C", d: 2}},
+ u: {$set: {f2: "f2"}},
+ multi: true,
+ }]
+ },
+ expectFailedUpdate([doc2]));
+
+ // Query on the metaField and modify the metaField and fields that are not the metaField.
+ testUpdate({
+ updates: [{
+ q: {[metaField]: {c: "C", d: 2}},
+ u: {$set: {[metaField]: {e: "E"}, f3: "f3"}, $inc: {f2: 3}, $unset: {f1: ""}},
+ multi: true,
+ }]
+ },
+ expectFailedUpdate([doc2]));
+
+ // Rename the metaField.
+ testUpdate({
+ updates: [{
+ q: {[metaField + ".a"]: "A"},
+ u: {$rename: {[metaField]: "Z"}},
+ multi: true,
+ }]
+ },
+ expectFailedUpdate([doc1, doc2, doc4]));
+
+ // Rename a subfield of the metaField to something not in the metaField.
+ testUpdate({
+ updates: [{
+ q: {[metaField + ".a"]: "A"},
+ u: {$rename: {[metaField + ".a"]: "notMetaField.a"}},
+ multi: true,
+ }]
+ },
+ expectFailedUpdate([doc1, doc2, doc4]));
+}
+
+//
+// Test cases when the update command succeeds.
+//
+
+function testCaseBatchUpdates({testUpdate}) {
+ // Multiple updates, ordered: query on the metaField and modify the metaField multiple times.
+ testUpdate({
+ initialDocList: [doc2],
+ updates: [
+ {
+ q: {[metaField]: {c: "C", d: 2}},
+ u: {$set: {[metaField]: 1}},
+ multi: true,
+ },
+ {
+ q: {[metaField]: 1},
+ u: {$set: {[metaField]: 2}},
+ multi: true,
+ },
+ {
+ q: {[metaField]: 2},
+ u: {$set: {[metaField]: 3}},
+ multi: true,
+ }
+ ],
+ resultDocList: [{
+ _id: 2,
+ [timeField]: dateTime,
+ [metaField]: 3,
+ f: [{"k": "K", "v": "V"}],
+ }],
+ n: 3,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Multiple updates, ordered: query on the metaField and on a field that is not the metaField.
+ testUpdate({
+ initialDocList: [doc1],
+ updates: [
+ {
+ q: {[metaField]: {a: "A", b: "B"}},
+ u: {$set: {[metaField]: {c: "C", d: 1}}},
+ multi: true,
+ },
+ {
+ q: {measurement: "cpu", [metaField + ".d"]: 1},
+ u: {$set: {[metaField + ".c"]: "CC"}},
+ multi: true,
+ }
+ ],
+ resultDocList: [doc1],
+ // If the shardKey is on one of the fields being updated, this must fail.
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ failCode: ErrorCodes.InvalidOptions,
+ });
+
+ // Multiple updates, ordered: query on the metaField and modify the metaField and a field that
+ // is not the metaField using dot notation.
+ testUpdate({
+ initialDocList: [doc2],
+ updates: [
+ {
+ q: {[metaField]: {c: "C", d: 2}},
+ u: {$inc: {[metaField + ".d"]: 6}},
+ multi: true,
+ },
+ {
+ q: {[metaField]: {c: "C", d: 8}},
+ u: {$set: {"f1.0": "f2"}},
+ multi: true,
+ }
+ ],
+ resultDocList: [{
+ _id: 2,
+ [timeField]: dateTime,
+ [metaField]: {c: "C", d: 8},
+ f: [{"k": "K", "v": "V"}],
+ }],
+ // If the shardKey is on the field being updated, this must fail to update any docs.
+ n: 1,
+ pathToMetaFieldBeingUpdated: "d",
+ failCode: ErrorCodes.InvalidOptions,
+ });
+
+ // Multiple updates, ordered: query on the metaField and modify a field that is not the
+ // metaField using dot notation.
+ testUpdate({
+ updates: [
+ {
+ q: {[metaField]: {c: "C", d: 2}},
+ u: {$set: {"f1.0": "f2"}},
+ multi: true,
+ },
+ {
+ q: {[metaField]: {c: "C", d: 2}},
+ u: {$inc: {[metaField + ".d"]: 6}},
+ multi: true,
+ }
+ ]
+ },
+ expectFailedUpdate([doc2]));
+
+ // Multiple updates, unordered: Modify the metaField, a field that is not the metaField, and the
+ // metaField. The first and last updates should succeed.
+ testUpdate({
+ initialDocList: [doc2],
+ updates: [
+ {
+ q: {[metaField]: {c: "C", d: 2}},
+ u: {$inc: {[metaField + ".d"]: 6}},
+ multi: true,
+ },
+ {
+ q: {[metaField]: {c: "C", d: 8}},
+ u: {$set: {"f1.0": "f2"}},
+ multi: true,
+ },
+ {
+ q: {[metaField]: {c: "C", d: 8}},
+ u: {$inc: {[metaField + ".d"]: 7}},
+ multi: true,
+ }
+ ],
+ resultDocList: [{
+ _id: 2,
+ [timeField]: dateTime,
+ [metaField]: {c: "C", d: 15},
+ f: [{"k": "K", "v": "V"}],
+ }],
+ ordered: false,
+ n: 2,
+ pathToMetaFieldBeingUpdated: "d",
+ failCode: ErrorCodes.InvalidOptions,
+ });
+}
+
+function testCaseValidMetaFieldUpdates({testUpdate}) {
+ // Rename a subfield to the metaField.
+ testUpdate({
+ initialDocList: [doc1, doc2],
+ updates: [{
+ q: {[metaField + ".a"]: "A"},
+ u: {$rename: {[metaField + ".a"]: metaField + ".z"}},
+ multi: true,
+ }],
+ resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: {z: "A", b: "B"}}, doc2],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "a",
+ });
+
+ // Query on the metaField and modify the metaField.
+ testUpdate({
+ initialDocList: [doc1],
+ updates: [{
+ q: {[metaField]: {a: "A", b: "B"}},
+ u: {$set: {[metaField]: {c: "C"}}},
+ multi: true,
+ }],
+ resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: {c: "C"}}],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Query on the metaField and modify the metaField of 1 matching document.
+ testUpdate({
+ initialDocList: [doc1, doc2, doc4, doc5],
+ updates: [{
+ q: {"$and": [{[metaField + ".c"]: "C"}, {[metaField + ".d"]: 2}]},
+ u: {$set: {[metaField + ".c"]: 1}},
+ multi: true,
+ }],
+ resultDocList: [
+ doc1,
+ {_id: 2, [timeField]: dateTime, [metaField]: {c: 1, d: 2}, f: [{"k": "K", "v": "V"}]},
+ doc4,
+ doc5
+ ],
+ ordered: false,
+ n: 1,
+ pathToMetaFieldBeingUpdated: "c",
+ });
+
+ // Query on the metaField and update the metaField of multiple matching documents.
+ testUpdate({
+ initialDocList: [doc1, doc2, doc4, doc5],
+ updates: [{
+ q: {[metaField + ".a"]: "A"},
+ u: {$unset: {[metaField + ".a"]: ""}},
+ multi: true,
+ }],
+ resultDocList: [
+ {_id: 1, [timeField]: dateTime, [metaField]: {b: "B"}},
+ {_id: 2, [timeField]: dateTime, [metaField]: {c: "C", d: 2}, f: [{"k": "K", "v": "V"}]},
+ {_id: 4, [timeField]: dateTime, [metaField]: {b: "B"}, f: "F"},
+ {_id: 5, [timeField]: dateTime, [metaField]: {b: "B", c: "C"}}
+ ],
+ ordered: false,
+ n: 3,
+ pathToMetaFieldBeingUpdated: "a",
+ });
+
+ // Compound query on the metaField using dot notation and modify the metaField.
+ testUpdate({
+ initialDocList: [doc1],
+ updates: [{
+ q: {"$and": [{[metaField + ".a"]: "A"}, {[metaField + ".b"]: "B"}]},
+ u: {$set: {[metaField]: {c: "C"}}},
+ multi: true,
+ }],
+ resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: {c: "C"}}],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Query on the metaField using dot notation and modify the metaField.
+ testUpdate({
+ initialDocList: [doc1],
+ updates: [{
+ q: {[metaField + ".a"]: "A"},
+ u: {$set: {[metaField]: {c: "C"}}},
+ multi: true,
+ }],
+ resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: {c: "C"}}],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Query on the metaField using dot notation and modify the metaField.
+ testUpdate({
+ initialDocList: [doc2],
+ updates: [{
+ q: {[metaField + ".c"]: "C"},
+ u: {$inc: {[metaField + ".d"]: 10}},
+ multi: true,
+ }],
+ resultDocList: [
+ {_id: 2, [timeField]: dateTime, [metaField]: {c: "C", d: 12}, f: [{"k": "K", "v": "V"}]}
+ ],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "d",
+ });
+
+ // Query with an empty document (i.e update all documents in the collection).
+ testUpdate({
+ initialDocList: [doc1, doc2],
+ updates: [{
+ q: {},
+ u: {$set: {[metaField]: {z: "Z"}}},
+ multi: true,
+ }],
+ resultDocList: [
+ {_id: 1, [timeField]: dateTime, [metaField]: {z: "Z"}},
+ {_id: 2, [timeField]: dateTime, [metaField]: {z: "Z"}, f: [{"k": "K", "v": "V"}]}
+ ],
+ n: 2,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Remove the metaField.
+ testUpdate({
+ initialDocList: [doc1],
+ updates:
+ [{q: {[metaField]: {a: "A", b: "B"}}, u: {$unset: {[metaField]: ""}}, multi: true}],
+ resultDocList: [{_id: 1, [timeField]: dateTime}],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Update where one of the matching documents is a no-op update.
+ testUpdate({
+ initialDocList: [doc1, doc4, doc5],
+ updates: [
+ {
+ q: {[metaField + ".a"]: "A"},
+ u: {$set: {[metaField + ".c"]: "C"}},
+ multi: true,
+ },
+ ],
+ resultDocList: [
+ {_id: 1, [timeField]: dateTime, [metaField]: {a: "A", b: "B", c: "C"}},
+ {_id: 4, [timeField]: dateTime, [metaField]: {a: "A", b: "B", c: "C"}, f: "F"},
+ {_id: 5, [timeField]: dateTime, [metaField]: {a: "A", b: "B", c: "C"}}
+ ],
+ n: 3,
+ nModified: 2,
+ pathToMetaFieldBeingUpdated: "c",
+ });
+
+ // Query for documents using $jsonSchema with the metaField required.
+ testUpdate({
+ initialDocList: [doc1, doc2, doc3],
+ updates: [{
+ q: {"$jsonSchema": {"required": [metaField]}},
+ u: {$set: {[metaField]: "a"}},
+ multi: true
+ }],
+ resultDocList: [
+ {_id: 1, [timeField]: dateTime, [metaField]: "a"},
+ {_id: 2, [timeField]: dateTime, [metaField]: "a", f: [{"k": "K", "v": "V"}]},
+ doc3
+ ],
+ n: 2,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Query for documents using $jsonSchema with the metaField in dot notation required.
+ testUpdate({
+ initialDocList: [doc1, doc2, doc3],
+ updates: [{
+ q: {"$jsonSchema": {"required": [metaField + ".a"]}},
+ u: {$set: {[metaField]: "a"}},
+ multi: true
+ }],
+ resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: "a"}, doc2, doc3],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Query for documents using $jsonSchema with a field that is not the metaField required.
+ testUpdate({
+ updates: [{
+ q: {"$jsonSchema": {"required": [metaField, timeField]}},
+ u: {$set: {[metaField]: "a"}},
+ multi: true
+ }],
+ },
+ expectFailedUpdate([doc1, doc2, doc3]));
+
+ const nestedMetaObj = {_id: 6, [timeField]: dateTime, [metaField]: {[metaField]: "A", a: 1}};
+
+ // Query for documents using $jsonSchema with the metaField required and a required subfield of
+ // the metaField with the same name as the metaField.
+ testUpdate({
+ initialDocList: [doc1, nestedMetaObj],
+ updates: [{
+ q: {
+ "$jsonSchema": {
+ "required": [metaField],
+ "properties": {[metaField]: {"required": [metaField]}}
+ }
+ },
+ u: {$set: {[metaField]: "a"}},
+ multi: true
+ }],
+ resultDocList: [doc1, {_id: 6, [timeField]: dateTime, [metaField]: "a", a: 1}],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Query for documents using $jsonSchema with the metaField required and an optional field that
+ // is not the metaField.
+ testUpdate({
+ updates: [{
+ q: {
+ "$jsonSchema": {
+ "required": [metaField],
+ "properties": {"measurement": {description: "can be any value"}}
+ }
+ },
+ u: {$set: {[metaField]: "a"}},
+ multi: true
+ }]
+ },
+ expectFailedUpdate([doc1, nestedMetaObj]));
+
+ // Query for documents on the metaField with the metaField nested within nested operators.
+ testUpdate({
+ initialDocList: [doc1, doc2, doc3],
+ updates: [{
+ q: {
+ "$and": [
+ {"$or": [{[metaField]: {"$ne": "B"}}, {[metaField]: {"a": {"$eq": "B"}}}]},
+ {[metaField]: {a: "A", b: "B"}}
+ ]
+ },
+ u: {$set: {[metaField]: "a"}},
+ multi: true
+ }],
+ resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: "a"}, doc2, doc3],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Updates where upsert:false should not insert a new document when no match is found. In case
+ // the shard key is on the meta field, this should not update any documents but also but not
+ // report a failure, since no documents were matched.
+ testUpdate({
+ initialDocList: [doc1, doc4, doc5],
+ updates: [{
+ q: {[metaField]: "Z"},
+ u: {$set: {[metaField]: 5}},
+ multi: true,
+ }],
+ resultDocList: [doc1, doc4, doc5],
+ n: 0,
+ });
+
+ // Do the same test case as above but with upsert:true, which should fail.
+ testUpdate({
+ updates: [{
+ q: {[metaField]: "Z"},
+ u: {$set: {[metaField]: 5}},
+ multi: true,
+ upsert: true,
+ }]
+ },
+ expectFailedUpdate([doc1, doc4, doc5]));
+}
+
+function testCaseUpdateWithLetDoc({testUpdate}) {
+ // Use a variable defined in the let option in the query to modify the metaField.
+ testUpdate({
+ initialDocList: [doc1, doc4, doc5],
+ updates: [{
+ q: {$expr: {$eq: ["$" + metaField + ".a", "$$oldVal"]}},
+ u: {$set: {[metaField]: "aaa"}},
+ multi: true,
+ }],
+ letDoc: {oldVal: "A"},
+ resultDocList: [
+ {_id: 1, [timeField]: dateTime, [metaField]: "aaa"},
+ {_id: 4, [timeField]: dateTime, [metaField]: "aaa", f: "F"},
+ {_id: 5, [timeField]: dateTime, [metaField]: "aaa"}
+ ],
+ n: 3,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Variables defined in the let option can only be used in the update if the update is an
+ // pipeline update. Since this update is an update document, the literal name of the variable
+ // will be used in the update instead of the variable's value.
+ testUpdate({
+ initialDocList: [doc1],
+ updates: [{
+ q: {[metaField + ".a"]: "A"},
+ u: {$set: {[metaField]: "$$myVar"}},
+ multi: true,
+ }],
+ letDoc: {myVar: "aaa"},
+ resultDocList: [{_id: 1, [timeField]: dateTime, [metaField]: "$$myVar"}],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Use variables defined in the let option in the query to modify the metaField multiple times.
+ testUpdate({
+ initialDocList: [doc1, doc4, doc5],
+ updates: [
+ {
+ q: {$expr: {$eq: ["$" + metaField + ".a", "$$val1"]}},
+ u: {$set: {[metaField]: "aaa"}},
+ multi: true,
+ },
+ {
+ q: {$expr: {$eq: ["$" + metaField, "$$val2"]}},
+ u: {$set: {[metaField]: "bbb"}},
+ multi: true,
+ }
+ ],
+ letDoc: {val1: "A", val2: "aaa"},
+ resultDocList: [
+ {_id: 1, [timeField]: dateTime, [metaField]: "bbb"},
+ {_id: 4, [timeField]: dateTime, [metaField]: "bbb", f: "F"},
+ {_id: 5, [timeField]: dateTime, [metaField]: "bbb"}
+ ],
+ n: 6,
+ pathToMetaFieldBeingUpdated: "",
+ });
+}
+
+function testCaseCollationUpdates({testUpdate}) {
+ const collationDoc1 = {_id: 1, [timeField]: dateTime, [metaField]: "café"};
+ const collationDoc2 = {_id: 2, [timeField]: dateTime, [metaField]: "cafe"};
+ const collationDoc3 = {_id: 3, [timeField]: dateTime, [metaField]: "cafE"};
+ const initialDocList = [collationDoc1, collationDoc2, collationDoc3];
+
+ // Query on the metaField and modify the metaField using collation with strength level 1.
+ testUpdate({
+ initialDocList,
+ updates: [{
+ q: {[metaField]: "cafe"},
+ u: {$set: {[metaField]: "Updated"}},
+ multi: true,
+ collation: {locale: "fr", strength: 1},
+ }],
+ resultDocList: [
+ {_id: 1, [timeField]: dateTime, [metaField]: "Updated"},
+ {_id: 2, [timeField]: dateTime, [metaField]: "Updated"},
+ {_id: 3, [timeField]: dateTime, [metaField]: "Updated"}
+ ],
+ n: 3,
+ pathToMetaFieldBeingUpdated: "",
+ });
+
+ // Query on the metaField and modify the metaField using collation with the default strength
+ // (level 3).
+ testUpdate({
+ initialDocList,
+ updates: [{
+ q: {[metaField]: "cafe"},
+ u: {$set: {[metaField]: "Updated"}},
+ multi: true,
+ collation: {locale: "fr"},
+ }],
+ resultDocList: [
+ collationDoc1,
+ {_id: 2, [timeField]: dateTime, [metaField]: "Updated"},
+ collationDoc3,
+ ],
+ n: 1,
+ pathToMetaFieldBeingUpdated: "",
+ });
+}
+
+function testCaseNullUpdates({testUpdate}) {
+ // Assumes shard key is meta.a.
+ const nullDoc = {_id: 1, [timeField]: dateTime, [metaField]: {a: null, b: 1}};
+ const missingDoc1 = {_id: 2, [timeField]: dateTime, [metaField]: {b: 1}};
+ const missingDoc2 = {_id: 3, [timeField]: dateTime, [metaField]: "foo"};
+ const initialDocList = [nullDoc, missingDoc1, missingDoc2];
+
+ // Query on the metaField and modify the metaField using collation with strength level 1.
+ testUpdate({
+ initialDocList,
+ updates: [{
+ q: {[metaField]: {$ne: null}},
+ u: {$set: {[metaField]: "Updated"}},
+ multi: true,
+ }],
+ resultDocList: [
+ {_id: 1, [timeField]: dateTime, [metaField]: "Updated"},
+ {_id: 2, [timeField]: dateTime, [metaField]: "Updated"},
+ {_id: 3, [timeField]: dateTime, [metaField]: "Updated"},
+ ],
+ n: 3,
+ });
+}
+
+// Run tests with a variety of shardKeys and timeseries configurations.
+const timeseriesOptions = {
+ timeField,
+ metaField
+};
+const tests = [
+ testCaseMultiFalseUpdateFails,
+ testCaseNoMetaFieldQueryUpdateFails,
+ testCaseIllegalMetaFieldUpdateFails,
+ testCaseReplacementAndPipelineUpdateFails,
+ testCaseCollationUpdates,
+ testCaseUpdateWithLetDoc,
+ testCaseBatchUpdates,
+ testCaseValidMetaFieldUpdates,
+];
+testUpdates({shardKeyTimeField: timeField, timeseriesOptions: {timeField}, tests});
+testUpdates({shardKeyMetaFieldPath: metaField, timeseriesOptions, tests});
+testUpdates(
+ {shardKeyTimeField: timeField, shardKeyMetaFieldPath: metaField, timeseriesOptions, tests});
+
+// Run a relevant subset of tests in the case when meta.a is the shard key.
+const testsForMetaSubfieldShardKey = [
+ testCaseNullUpdates,
+ testCaseMultiFalseUpdateFails,
+ testCaseNoMetaFieldQueryUpdateFails,
+ testCaseIllegalMetaFieldUpdateFails,
+ testCaseReplacementAndPipelineUpdateFails,
+ testCaseValidMetaFieldUpdates,
+];
+testUpdates({
+ shardKeyMetaFieldPath: metaField + ".a",
+ timeseriesOptions,
+ tests: testsForMetaSubfieldShardKey,
+});
+testUpdates({
+ shardKeyMetaFieldPath: metaField + ".a",
+ shardKeyTimeField: timeField,
+ timeseriesOptions,
+ tests: testsForMetaSubfieldShardKey,
+});
+
+st.stop();
+})();
diff --git a/jstests/sharding/timeseries_update_routing.js b/jstests/sharding/timeseries_update_routing.js
new file mode 100644
index 00000000000..49e33faeea2
--- /dev/null
+++ b/jstests/sharding/timeseries_update_routing.js
@@ -0,0 +1,206 @@
+/**
+ * Test routing of updates into sharded timeseries collection.
+ *
+ * @tags: [
+ * requires_fcv_51,
+ * requires_find_command,
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest' helpers.
+
+const st = new ShardingTest({shards: 2, rs: {nodes: 2}});
+
+//
+// Constants used throughout all tests.
+//
+
+const dbName = 'testDB';
+const collName = 'weather';
+const bucketCollName = `system.buckets.${collName}`;
+const bucketCollFullName = `${dbName}.${bucketCollName}`;
+
+//
+// Checks for feature flags.
+//
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(st.shard0)) {
+ jsTestLog("Skipping test because the time-series collection feature flag is disabled");
+ st.stop();
+ return;
+}
+
+if (!TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) {
+ jsTestLog("Skipping test because the sharded time-series collection feature flag is disabled");
+ st.stop();
+ return;
+}
+
+if (!TimeseriesTest.timeseriesUpdatesAndDeletesEnabled(st.shard0)) {
+ jsTestLog(
+ "Skipping test because the updates and deletes on time-series collection feature flag is disabled");
+ st.stop();
+ return;
+}
+
+if (!TimeseriesTest.shardedTimeseriesUpdatesAndDeletesEnabled(st.shard0)) {
+ jsTestLog(
+ "Skipping test because the updates and deletes on sharded time-series collection feature flag is disabled");
+ st.stop();
+ return;
+}
+
+const mongos = st.s;
+const testDB = mongos.getDB(dbName);
+const primary = st.shard0;
+const primaryDB = primary.getDB(dbName);
+const otherShard = st.shard1;
+const otherShardDB = otherShard.getDB(dbName);
+
+testDB.dropDatabase();
+assert.commandWorked(mongos.adminCommand({enableSharding: dbName}));
+st.ensurePrimaryShard(testDB.getName(), primary.shardName);
+
+assert.commandWorked(testDB.createCollection(
+ collName, {timeseries: {timeField: "time", metaField: "location", granularity: "hours"}}));
+
+const testColl = testDB[collName];
+
+//
+// Helper functions
+//
+
+function testUpdateRouting({updates, nModified, shardsTargetedCount}) {
+ // Restart profiling.
+ for (const db of [primaryDB, otherShardDB]) {
+ db.setProfilingLevel(0);
+ db.system.profile.drop();
+ db.setProfilingLevel(2);
+ }
+
+ // Verify output documents.
+ const updateCommand = {update: testColl.getName(), updates};
+ const result = assert.commandWorked(testDB.runCommand(updateCommand));
+
+ assert.eq(nModified, result.nModified);
+
+ // Verify profiling output.
+ if (shardsTargetedCount > 0) {
+ let filter = {"op": "update", "ns": "testDB.weather"};
+ let actualCount = 0;
+ for (const db of [primaryDB, otherShardDB]) {
+ const expectedEntries = db.system.profile.find(filter).toArray();
+ actualCount += expectedEntries.length;
+ }
+ assert.eq(actualCount, shardsTargetedCount);
+ }
+}
+
+(function setUpTestColl() {
+ assert.commandWorked(testDB.adminCommand(
+ {shardCollection: testColl.getFullName(), key: {"location.city": 1, time: 1}}));
+
+ const data = [
+ // Cork.
+ {
+ location: {city: "Cork", coordinates: [-12, 10]},
+ time: ISODate("2021-05-18T08:00:00.000Z"),
+ temperature: 12,
+ },
+ {
+ location: {city: "Cork", coordinates: [0, 0]},
+ time: ISODate("2021-05-18T07:30:00.000Z"),
+ temperature: 15,
+ },
+ // Dublin.
+ {
+ location: {city: "Dublin", coordinates: [25, -43]},
+ time: ISODate("2021-05-18T08:00:00.000Z"),
+ temperature: 12,
+ },
+ {
+ location: {city: "Dublin", coordinates: [0, 0]},
+ time: ISODate("2021-05-18T08:00:00.000Z"),
+ temperature: 22,
+ },
+ {
+ location: {city: "Dublin", coordinates: [25, -43]},
+ time: ISODate("2021-05-18T08:30:00.000Z"),
+ temperature: 12.5,
+ },
+ {
+ location: {city: "Dublin", coordinates: [25, -43]},
+ time: ISODate("2021-05-18T09:00:00.000Z"),
+ temperature: 13,
+ },
+ // Galway.
+ {
+ location: {city: "Galway", coordinates: [22, 44]},
+ time: ISODate("2021-05-18T08:00:00.000Z"),
+ temperature: 20,
+ },
+ {
+ location: {city: "Galway", coordinates: [0, 0]},
+ time: ISODate("2021-05-18T09:00:00.000Z"),
+ temperature: 20,
+ },
+ ];
+ assert.commandWorked(testColl.insertMany(data));
+})();
+
+(function defineChunks() {
+ function splitAndMove(city, minTime, destination) {
+ assert.commandWorked(st.s.adminCommand(
+ {split: bucketCollFullName, middle: {"meta.city": city, 'control.min.time': minTime}}));
+ assert.commandWorked(st.s.adminCommand({
+ movechunk: bucketCollFullName,
+ find: {"meta.city": city, 'control.min.time': minTime},
+ to: destination.shardName,
+ _waitForDelete: true
+ }));
+ }
+
+ // Place the Dublin buckets on the primary and split the other buckets across both shards.
+ splitAndMove("Galway", ISODate("2021-05-18T08:00:00.000Z"), otherShard);
+ splitAndMove("Dublin", MinKey, primary);
+ splitAndMove("Cork", ISODate("2021-05-18T09:00:00.000Z"), otherShard);
+})();
+
+// All Dublin documents exist on the primary, so we should only target the one shard.
+testUpdateRouting({
+ updates: [{
+ q: {"location.city": "Dublin"},
+ u: {$set: {"location.coordinates": [123, -123]}},
+ multi: true,
+ }],
+ nModified: 4,
+ shardsTargetedCount: 1
+});
+
+// Galway documents exist on both shards, so we should target both.
+testUpdateRouting({
+ updates: [{
+ q: {"location.city": "Galway"},
+ u: {$set: {"location.coordinates": [6, 7]}},
+ multi: true,
+ }],
+ nModified: 2,
+ shardsTargetedCount: 2
+});
+
+// All shards need to be targeted for an empty query.
+testUpdateRouting({
+ updates: [{
+ q: {},
+ u: {$set: {"location.coordinates": [222, 111]}},
+ multi: true,
+ }],
+ nModified: 8,
+ shardsTargetedCount: 2
+});
+
+st.stop();
+})();
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index adce5e0ac34..eaa1a6aea7e 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -958,16 +958,21 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
WriteResult performUpdates(OperationContext* opCtx,
const write_ops::UpdateCommandRequest& wholeOp,
OperationSource source) {
+ auto ns = wholeOp.getNamespace();
+ if (source == OperationSource::kTimeseriesUpdate && !ns.isTimeseriesBucketsCollection()) {
+ ns = ns.makeTimeseriesBucketsNamespace();
+ }
+
// Update performs its own retries, so we should not be in a WriteUnitOfWork unless run in a
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
invariant(!opCtx->lockState()->inAWriteUnitOfWork() ||
(txnParticipant && opCtx->inMultiDocumentTransaction()));
- uassertStatusOK(userAllowedWriteNS(opCtx, wholeOp.getNamespace()));
+ uassertStatusOK(userAllowedWriteNS(opCtx, ns));
DisableDocumentSchemaValidationIfTrue docSchemaValidationDisabler(
opCtx, wholeOp.getWriteCommandRequestBase().getBypassDocumentValidation());
- LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());
+ LastOpFixer lastOpFixer(opCtx, ns);
bool containsRetry = false;
ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); });
@@ -1020,23 +1025,11 @@ WriteResult performUpdates(OperationContext* opCtx,
: std::vector<StmtId>{stmtId};
out.results.emplace_back(performSingleUpdateOpWithDupKeyRetry(
- opCtx,
- source == OperationSource::kTimeseriesUpdate
- ? wholeOp.getNamespace().makeTimeseriesBucketsNamespace()
- : wholeOp.getNamespace(),
- stmtIds,
- singleOp,
- runtimeConstants,
- wholeOp.getLet(),
- source));
+ opCtx, ns, stmtIds, singleOp, runtimeConstants, wholeOp.getLet(), source));
lastOpFixer.finishedOpSuccessfully();
} catch (const DBException& ex) {
- out.canContinue = handleError(opCtx,
- ex,
- wholeOp.getNamespace(),
- wholeOp.getWriteCommandRequestBase(),
- singleOp.getMulti(),
- &out);
+ out.canContinue = handleError(
+ opCtx, ex, ns, wholeOp.getWriteCommandRequestBase(), singleOp.getMulti(), &out);
if (!out.canContinue) {
break;
}
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index 0e61716d42d..f87abb745a9 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_options.h"
+#include "mongo/db/timeseries/timeseries_update_delete_util.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
#include "mongo/s/client/shard_registry.h"
@@ -128,12 +129,11 @@ UpdateType getUpdateExprType(const write_ops::UpdateOpEntry& updateDoc) {
BSONObj getUpdateExprForTargeting(const boost::intrusive_ptr<ExpressionContext> expCtx,
const ShardKeyPattern& shardKeyPattern,
UpdateType updateType,
- const write_ops::UpdateOpEntry& updateOp) {
+ const BSONObj& updateQuery,
+ const write_ops::UpdateModification& updateMod) {
// We should never see an invalid update type here.
invariant(updateType != UpdateType::kUnknown);
- const auto& updateMod = updateOp.getU();
-
// If this is not a replacement update, then the update expression remains unchanged.
if (updateType != UpdateType::kReplacement) {
BSONObjBuilder objBuilder;
@@ -157,7 +157,7 @@ BSONObj getUpdateExprForTargeting(const boost::intrusive_ptr<ExpressionContext>
// This will guarantee that we can target a single shard, but it is not necessarily fatal if no
// exact _id can be found.
const auto idFromQuery =
- uassertStatusOK(kVirtualIdShardKey.extractShardKeyFromQuery(expCtx, updateOp.getQ()));
+ uassertStatusOK(kVirtualIdShardKey.extractShardKeyFromQuery(expCtx, updateQuery));
if (auto idElt = idFromQuery[kIdFieldName]) {
updateExpr = updateExpr.addField(idElt);
}
@@ -391,8 +391,8 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext*
// as if the the shard key values are specified as NULL. A replacement document is also allowed
// to have a missing '_id', and if the '_id' exists in the query, it will be emplaced in the
// replacement document for targeting purposes.
+
const auto& updateOp = itemRef.getUpdate();
- const auto updateType = getUpdateExprType(updateOp);
// If the collection is not sharded, forward the update to the primary shard.
if (!_cm.isSharded()) {
@@ -414,10 +414,38 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext*
itemRef.getLet(),
itemRef.getLegacyRuntimeConstants());
- const auto updateExpr =
- getUpdateExprForTargeting(expCtx, shardKeyPattern, updateType, updateOp);
const bool isUpsert = updateOp.getUpsert();
- const auto query = updateOp.getQ();
+ auto query = updateOp.getQ();
+
+ if (_isRequestOnTimeseriesViewNamespace) {
+ uassert(ErrorCodes::NotImplemented,
+ str::stream() << "Updates are disallowed on sharded timeseries collections.",
+ feature_flags::gFeatureFlagShardedTimeSeriesUpdateDelete.isEnabledAndIgnoreFCV());
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream()
+ << "A {multi:false} update on a sharded timeseries collection is disallowed.",
+ updateOp.getMulti());
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream()
+ << "An {upsert:true} update on a sharded timeseries collection is disallowed.",
+ !isUpsert);
+
+ // Since this is a timeseries query, we may need to rename the metaField.
+ if (auto metaField = _cm.getTimeseriesFields().get().getMetaField()) {
+ query = timeseries::translateQuery(query, *metaField);
+ } else {
+ // We want to avoid targeting the query incorrectly if no metaField is defined on the
+ // timeseries collection, since we only allow queries on the metaField for timeseries
+ // updates. Note: any non-empty query should fail to update once it reaches the shards
+ // because there is no metaField for it to query for, but we don't want to validate this
+ // during routing.
+ query = BSONObj();
+ }
+ }
+
+ const auto updateType = getUpdateExprType(updateOp);
+ const auto updateExpr =
+ getUpdateExprForTargeting(expCtx, shardKeyPattern, updateType, query, updateOp.getU());
// Utility function to target an update by shard key, and to handle any potential error results.
auto targetByShardKey = [this, &collation](StatusWith<BSONObj> swShardKey, std::string msg) {
diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h
index 0aef3ecbf52..5bee40f821f 100644
--- a/src/mongo/s/chunk_manager_targeter.h
+++ b/src/mongo/s/chunk_manager_targeter.h
@@ -150,7 +150,8 @@ private:
// Full namespace of the collection for this targeter
NamespaceString _nss;
- // Used to identify the original namespace that the user has requested.
+ // Used to identify the original namespace that the user has requested. Note: this will only be
+ // true if the buckets namespace is sharded.
bool _isRequestOnTimeseriesViewNamespace = false;
// Stores last error occurred