summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikita Lapkov <nikita.lapkov@mongodb.com>2021-06-10 17:02:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-16 11:20:22 +0000
commit0acd557cb0f788ff82ab3a2e4c437d881c6c5c0a (patch)
treea200123f72432014020958fe24c9f9d0c8e9a387
parent36edc888b6415b7dc37c13bc799a18bcaa4d8236 (diff)
downloadmongo-0acd557cb0f788ff82ab3a2e4c437d881c6c5c0a.tar.gz
SERVER-58175 Round time values before routing inserts to sharded timeseries collections
(cherry picked from commit d644971af8d2315cadf23b2c7addd7fe05ea3c2d)
-rw-r--r--jstests/sharding/timeseries_insert.js (renamed from jstests/sharding/timeseries_sharded_insert.js)0
-rw-r--r--jstests/sharding/timeseries_time_value_rounding.js151
-rw-r--r--src/mongo/db/timeseries/SConscript2
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp10
-rw-r--r--src/mongo/db/timeseries/timeseries_options.cpp66
-rw-r--r--src/mongo/db/timeseries/timeseries_options.h10
-rw-r--r--src/mongo/db/timeseries/timeseries_options_test.cpp63
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp7
-rw-r--r--src/mongo/s/chunk_manager_targeter_test.cpp10
10 files changed, 276 insertions, 44 deletions
diff --git a/jstests/sharding/timeseries_sharded_insert.js b/jstests/sharding/timeseries_insert.js
index 082024e8aba..082024e8aba 100644
--- a/jstests/sharding/timeseries_sharded_insert.js
+++ b/jstests/sharding/timeseries_insert.js
diff --git a/jstests/sharding/timeseries_time_value_rounding.js b/jstests/sharding/timeseries_time_value_rounding.js
new file mode 100644
index 00000000000..394658ee2c1
--- /dev/null
+++ b/jstests/sharding/timeseries_time_value_rounding.js
@@ -0,0 +1,151 @@
+/**
+ * Tests the fact that we round time values to the specified granularity before targeting shards.
+ *
+ * @tags: [
+ * requires_fcv_50,
+ * requires_find_command,
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/core/timeseries/libs/timeseries.js");
+load("jstests/aggregation/extras/utils.js");
+
+Random.setRandomSeed();
+
+const dbName = 'testDB';
+const collName = 'testColl';
+const timeField = 'time';
+const metaField = 'hostId';
+
+// Connections.
+const st = new ShardingTest({shards: 2, rs: {nodes: 2}});
+const mongos = st.s0;
+
+// Sanity checks.
+if (!TimeseriesTest.timeseriesCollectionsEnabled(st.shard0)) {
+ jsTestLog("Skipping test because the time-series collection feature flag is disabled");
+ st.stop();
+ return;
+}
+
+if (!TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) {
+ jsTestLog("Skipping test because the sharded time-series collection feature flag is disabled");
+ st.stop();
+ return;
+}
+
+// Databases.
+const mainDB = mongos.getDB(dbName);
+
+// Helpers.
+let currentId = 0;
+function generateId() {
+ return currentId++;
+}
+
+function makeDocument(metaValue, timeValue) {
+ return {
+ _id: generateId(),
+ [metaField]: metaValue,
+ [timeField]: timeValue,
+ data: Random.rand(),
+ };
+}
+
+function getDocumentsFromShard(shard, id) {
+ return shard.getDB(dbName)
+ .getCollection(`system.buckets.${collName}`)
+ .aggregate(
+ [{$_unpackBucket: {timeField: timeField, metaField: metaField}}, {$match: {_id: id}}])
+ .toArray();
+}
+
+function runTest() {
+ mainDB.dropDatabase();
+
+ assert.commandWorked(mongos.adminCommand({enableSharding: dbName}));
+
+ // Create timeseries collection.
+ assert.commandWorked(mainDB.createCollection(
+ collName,
+ {timeseries: {timeField: timeField, metaField: metaField, granularity: "hours"}}));
+ const coll = mainDB.getCollection(collName);
+
+ // Shard timeseries collection.
+ const shardKey = {[timeField]: 1};
+ assert.commandWorked(coll.createIndex(shardKey));
+ assert.commandWorked(mongos.adminCommand({
+ shardCollection: `${dbName}.${collName}`,
+ key: shardKey,
+ }));
+
+ // Insert initial set of documents.
+ const timeValues = [
+ ISODate("2000-01-01T00:00"),
+ ISODate("2000-01-01T05:00"),
+ ISODate("2000-01-01T15:00"),
+ ISODate("2000-01-01T20:00")
+ ];
+ const documents = Array.from(timeValues, (time, index) => makeDocument(index, time));
+ assert.commandWorked(coll.insert(documents));
+
+ // Manually split the data into two chunks.
+ const splitPoint = {[`control.min.${timeField}`]: ISODate("2000-01-01T10:30")};
+ assert.commandWorked(
+ mongos.adminCommand({split: `${dbName}.system.buckets.${collName}`, middle: splitPoint}));
+
+ // Ensure that currently both chunks reside on the primary shard.
+ let counts = st.chunkCounts(`system.buckets.${collName}`, dbName);
+ const primaryShard = st.getPrimaryShard(dbName);
+ assert.eq(2, counts[primaryShard.shardName], counts);
+
+ // Move one of the chunks into the second shard.
+ const otherShard = st.getOther(primaryShard);
+ assert.commandWorked(mongos.adminCommand({
+ movechunk: `${dbName}.system.buckets.${collName}`,
+ find: splitPoint,
+ to: otherShard.name,
+ _waitForDelete: true
+ }));
+
+ // Ensure that each shard owns one chunk.
+ counts = st.chunkCounts(`system.buckets.${collName}`, dbName);
+ assert.eq(1, counts[primaryShard.shardName], counts);
+ assert.eq(1, counts[otherShard.shardName], counts);
+
+ // Our sharded cluster looks like this:
+ //
+ // | | Primary shard | Other shard |
+ // |-------------|-----------------|-----------------|
+ // | Chunk range | [MinKey, 10:30) | [10:30, MaxKey] |
+ // | Documents | 00:00, 05:00 | 15:00, 20:00 |
+ //
+ // Now, we will try to insert a document with time value 10:31. Since we have specified
+ // granularity: "hours", the actual measurement inserted into bucket will have time value 10:00.
+ // This means that it should be routed to the primary shard. If mongos does not round time value
+ // before routing, our document would have been directed to the other shard, which is incorrect.
+ const edgeDocument = makeDocument(0, ISODate("2000-01-01T10:31"));
+ assert.commandWorked(coll.insert([edgeDocument]));
+ documents.push(edgeDocument);
+
+ assert.docEq([edgeDocument], getDocumentsFromShard(primaryShard, edgeDocument._id));
+ assert.eq([], getDocumentsFromShard(otherShard, edgeDocument._id));
+
+ const noFilterResult = coll.find({}).sort({_id: 1}).toArray();
+ assert.docEq(documents, noFilterResult);
+
+ for (let document of documents) {
+ const result = coll.find({[timeField]: document[timeField]}).toArray();
+ assert.docEq([document], result);
+ }
+}
+
+try {
+ runTest();
+} finally {
+ st.stop();
+}
+})();
diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript
index 866be00d415..8a75cd7fbb9 100644
--- a/src/mongo/db/timeseries/SConscript
+++ b/src/mongo/db/timeseries/SConscript
@@ -76,11 +76,13 @@ env.CppUnitTest(
'minmax_test.cpp',
'timeseries_index_schema_conversion_functions_test.cpp',
'timeseries_update_delete_util_test.cpp',
+ 'timeseries_options_test.cpp'
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/catalog/catalog_test_fixture',
'bucket_catalog',
'timeseries_index_schema_conversion_functions',
+ 'timeseries_options',
'timeseries_update_delete_util',
],
)
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index e65da2ce66f..33087511caa 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -150,13 +150,6 @@ OperationId getOpId(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-long long roundTimestampDown(const Date_t& time, const TimeseriesOptions& options) {
- int roundingSeconds =
- timeseries::getBucketRoundingSecondsFromGranularity(options.getGranularity());
- long long seconds = durationCount<Seconds>(time.toDurationSinceEpoch());
- return (seconds - (seconds % roundingSeconds));
-}
-
BSONObj buildControlMinTimestampDoc(StringData timeField, long long roundedSeconds) {
BSONObjBuilder builder;
builder.append(timeField, Date_t::fromMillisSinceEpoch(1000 * roundedSeconds));
@@ -654,7 +647,8 @@ const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutio
void BucketCatalog::_setIdTimestamp(Bucket* bucket,
const Date_t& time,
const TimeseriesOptions& options) {
- auto const roundedSeconds = roundTimestampDown(time, options);
+ auto roundedTime = timeseries::roundTimestampToGranularity(time, options.getGranularity());
+ auto const roundedSeconds = durationCount<Seconds>(roundedTime.toDurationSinceEpoch());
bucket->_id.setTimestamp(roundedSeconds);
// Make sure we set the control.min time field to match the rounded _id timestamp.
diff --git a/src/mongo/db/timeseries/timeseries_options.cpp b/src/mongo/db/timeseries/timeseries_options.cpp
index 81a3fc15cb4..e8b1df78f91 100644
--- a/src/mongo/db/timeseries/timeseries_options.cpp
+++ b/src/mongo/db/timeseries/timeseries_options.cpp
@@ -103,33 +103,6 @@ int getMaxSpanSecondsFromGranularity(BucketGranularityEnum granularity) {
MONGO_UNREACHABLE;
}
-int getBucketRoundingSecondsFromGranularity(BucketGranularityEnum granularity) {
- switch (granularity) {
- case BucketGranularityEnum::Seconds:
- // Round down to nearest minute.
- return 60;
- case BucketGranularityEnum::Minutes:
- // Round down to nearest hour.
- return 60 * 60;
- case BucketGranularityEnum::Hours:
- // Round down to hearest day.
- return 60 * 60 * 24;
- }
- MONGO_UNREACHABLE;
-}
-bool optionsAreEqual(const TimeseriesOptions& option1, const TimeseriesOptions& option2) {
- const auto option1BucketSpan = option1.getBucketMaxSpanSeconds()
- ? *option1.getBucketMaxSpanSeconds()
- : getMaxSpanSecondsFromGranularity(option1.getGranularity());
- const auto option2BucketSpan = option2.getBucketMaxSpanSeconds()
- ? *option2.getBucketMaxSpanSeconds()
- : getMaxSpanSecondsFromGranularity(option2.getGranularity());
- return option1.getTimeField() == option1.getTimeField() &&
- option1.getMetaField() == option2.getMetaField() &&
- option1.getGranularity() == option2.getGranularity() &&
- option1BucketSpan == option2BucketSpan;
-}
-
StatusWith<std::pair<TimeseriesOptions, bool>> applyTimeseriesOptionsModifications(
const TimeseriesOptions& currentOptions, const BSONObj& mod) {
TimeseriesOptions newOptions = currentOptions;
@@ -171,5 +144,44 @@ BSONObj generateViewPipeline(const TimeseriesOptions& options, bool asArray) {
<< *options.getBucketMaxSpanSeconds() << "exclude" << BSONArray())));
}
+bool optionsAreEqual(const TimeseriesOptions& option1, const TimeseriesOptions& option2) {
+ const auto option1BucketSpan = option1.getBucketMaxSpanSeconds()
+ ? *option1.getBucketMaxSpanSeconds()
+ : getMaxSpanSecondsFromGranularity(option1.getGranularity());
+ const auto option2BucketSpan = option2.getBucketMaxSpanSeconds()
+ ? *option2.getBucketMaxSpanSeconds()
+ : getMaxSpanSecondsFromGranularity(option2.getGranularity());
+ return option1.getTimeField() == option1.getTimeField() &&
+ option1.getMetaField() == option2.getMetaField() &&
+ option1.getGranularity() == option2.getGranularity() &&
+ option1BucketSpan == option2BucketSpan;
+}
+
+namespace {
+/**
+ * Returns the number of seconds used to round down the bucket ID and control.min timestamp.
+ */
+int getBucketRoundingSecondsFromGranularity(BucketGranularityEnum granularity) {
+ switch (granularity) {
+ case BucketGranularityEnum::Seconds:
+ // Round down to nearest minute.
+ return 60;
+ case BucketGranularityEnum::Minutes:
+ // Round down to nearest hour.
+ return 60 * 60;
+ case BucketGranularityEnum::Hours:
+ // Round down to nearest day.
+ return 60 * 60 * 24;
+ }
+ MONGO_UNREACHABLE;
+}
+} // namespace
+
+Date_t roundTimestampToGranularity(const Date_t& time, BucketGranularityEnum granularity) {
+ int roundingSeconds = getBucketRoundingSecondsFromGranularity(granularity);
+ long long timeSeconds = durationCount<Seconds>(time.toDurationSinceEpoch());
+ long long roundedTimeSeconds = (timeSeconds - (timeSeconds % roundingSeconds));
+ return Date_t::fromDurationSinceEpoch(Seconds{roundedTimeSeconds});
+}
} // namespace timeseries
} // namespace mongo
diff --git a/src/mongo/db/timeseries/timeseries_options.h b/src/mongo/db/timeseries/timeseries_options.h
index feaf7e3cd33..87bf4387473 100644
--- a/src/mongo/db/timeseries/timeseries_options.h
+++ b/src/mongo/db/timeseries/timeseries_options.h
@@ -53,16 +53,16 @@ boost::optional<TimeseriesOptions> getTimeseriesOptions(OperationContext* opCtx,
*/
int getMaxSpanSecondsFromGranularity(BucketGranularityEnum granularity);
-/**
- * Returns the number of seconds used to round down the bucket ID and control.min timestamp.
- */
-int getBucketRoundingSecondsFromGranularity(BucketGranularityEnum granularity);
-
StatusWith<std::pair<TimeseriesOptions, bool>> applyTimeseriesOptionsModifications(
const TimeseriesOptions& current, const BSONObj& mod);
BSONObj generateViewPipeline(const TimeseriesOptions& options, bool asArray);
bool optionsAreEqual(const TimeseriesOptions& option1, const TimeseriesOptions& option2);
+
+/**
+ * Rounds down timestamp to the specified granularity.
+ */
+Date_t roundTimestampToGranularity(const Date_t& time, BucketGranularityEnum granularity);
} // namespace timeseries
} // namespace mongo
diff --git a/src/mongo/db/timeseries/timeseries_options_test.cpp b/src/mongo/db/timeseries/timeseries_options_test.cpp
new file mode 100644
index 00000000000..e3106407cea
--- /dev/null
+++ b/src/mongo/db/timeseries/timeseries_options_test.cpp
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/timeseries/timeseries_options.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/time_support.h"
+
+
+namespace mongo {
+
+TEST(TimeseriesOptionsTest, RoundTimestampToGranularity) {
+ std::vector<std::tuple<BucketGranularityEnum, std::string, std::string>> testCases{
+ {BucketGranularityEnum::Seconds, "2021-01-01T00:00:15.555Z", "2021-01-01T00:00:00.000Z"},
+ {BucketGranularityEnum::Seconds, "2021-01-01T00:00:30.555Z", "2021-01-01T00:00:00.000Z"},
+ {BucketGranularityEnum::Seconds, "2021-01-01T00:00:45.555Z", "2021-01-01T00:00:00.000Z"},
+
+ {BucketGranularityEnum::Minutes, "2021-01-01T00:15:00.000Z", "2021-01-01T00:00:00.000Z"},
+ {BucketGranularityEnum::Minutes, "2021-01-01T00:30:00.000Z", "2021-01-01T00:00:00.000Z"},
+ {BucketGranularityEnum::Minutes, "2021-01-01T00:45:00.000Z", "2021-01-01T00:00:00.000Z"},
+
+ {BucketGranularityEnum::Hours, "2021-01-01T06:00:00.000Z", "2021-01-01T00:00:00.000Z"},
+ {BucketGranularityEnum::Hours, "2021-01-01T12:00:00.000Z", "2021-01-01T00:00:00.000Z"},
+ {BucketGranularityEnum::Hours, "2021-01-01T18:00:00.000Z", "2021-01-01T00:00:00.000Z"},
+ };
+
+ for (const auto& [granularity, input, expectedOutput] : testCases) {
+ auto inputDate = dateFromISOString(input);
+ ASSERT_OK(inputDate);
+ auto roundedDate =
+ timeseries::roundTimestampToGranularity(inputDate.getValue(), granularity);
+ ASSERT_EQ(dateToISOStringUTC(roundedDate), expectedOutput);
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 0cf494962dd..1727e0fed45 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -35,6 +35,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/lasterror',
+ '$BUILD_DIR/mongo/db/timeseries/timeseries_options',
'query/cluster_query',
'write_ops/cluster_write_ops',
],
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index fbbed6768d1..8abd414d714 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/query/collation/collation_index_key.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/timeseries/timeseries_constants.h"
+#include "mongo/db/timeseries/timeseries_options.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
#include "mongo/s/client/shard_registry.h"
@@ -290,13 +291,15 @@ BSONObj ChunkManagerTargeter::extractBucketsShardKeyFromTimeseriesDoc(
uassert(5743702,
str::stream() << "'" << timeField
<< "' must be present and contain a valid BSON UTC datetime value",
- !timeElement.eoo());
+ !timeElement.eoo() && timeElement.type() == BSONType::Date);
+ auto roundedTimeValue = timeseries::roundTimestampToGranularity(
+ timeElement.date(), timeseriesOptions.getGranularity());
{
BSONObjBuilder controlBuilder{builder.subobjStart(timeseries::kBucketControlFieldName)};
{
BSONObjBuilder minBuilder{
controlBuilder.subobjStart(timeseries::kBucketControlMinFieldName)};
- minBuilder.append(timeElement);
+ minBuilder.append(timeField, roundedTimeValue);
minBuilder.done();
}
controlBuilder.done();
diff --git a/src/mongo/s/chunk_manager_targeter_test.cpp b/src/mongo/s/chunk_manager_targeter_test.cpp
index e7986118ff8..14070303a0f 100644
--- a/src/mongo/s/chunk_manager_targeter_test.cpp
+++ b/src/mongo/s/chunk_manager_targeter_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
+#include "mongo/db/timeseries/timeseries_options.h"
#include "mongo/s/catalog_cache_test_fixture.h"
#include "mongo/s/chunk_manager_targeter.h"
#include "mongo/s/session_catalog_router.h"
@@ -372,7 +373,10 @@ TEST(ChunkManagerTargeterTest, ExtractBucketsShardKeyFromTimeseriesDocument) {
TimeseriesOptions options{std::string(timeField)};
options.setMetaField(metaField);
- auto date = Date_t::now();
+ auto dateStatus = dateFromISOString("2021-01-01T00:00:15.555Z");
+ ASSERT_OK(dateStatus);
+ auto date = dateStatus.getValue();
+ auto roundedDate = timeseries::roundTimestampToGranularity(date, options.getGranularity());
auto checkShardKey = [&](const BSONObj& tsShardKeyPattern,
const BSONObj& metaValue = BSONObj()) {
@@ -384,10 +388,12 @@ TEST(ChunkManagerTargeterTest, ExtractBucketsShardKeyFromTimeseriesDocument) {
}
return builder.obj();
}();
+
auto inputBucket = [&]() {
BSONObjBuilder builder;
builder << timeseries::kBucketControlFieldName
- << BSON(timeseries::kBucketControlMinFieldName << BSON(timeField << date));
+ << BSON(timeseries::kBucketControlMinFieldName
+ << BSON(timeField << roundedDate));
if (!metaValue.isEmpty()) {
builder << timeseries::kBucketMetaFieldName << metaValue;
}