summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/time_series/time_series_bucket_limit_count.js120
-rw-r--r--jstests/core/time_series/time_series_bucket_limit_size.js120
-rw-r--r--jstests/core/time_series/time_series_bucket_limit_time_range.js155
-rw-r--r--jstests/core/time_series/time_series_simple.js2
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/catalog/SConscript1
-rw-r--r--src/mongo/db/catalog/create_collection.cpp2
-rw-r--r--src/mongo/db/catalog/database_impl.cpp2
-rw-r--r--src/mongo/db/catalog/drop_collection.cpp19
-rw-r--r--src/mongo/db/catalog/drop_database.cpp3
-rw-r--r--src/mongo/db/commands/SConscript4
-rw-r--r--src/mongo/db/commands/create.idl22
-rw-r--r--src/mongo/db/commands/set_feature_compatibility_version_command.cpp3
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp404
-rw-r--r--src/mongo/db/timeseries/SConscript38
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp223
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h160
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp171
-rw-r--r--src/mongo/db/timeseries/timeseries.idl54
-rw-r--r--src/mongo/db/views/durable_view_catalog.cpp6
-rw-r--r--src/mongo/db/views/view.cpp17
-rw-r--r--src/mongo/db/views/view.h11
-rw-r--r--src/mongo/db/views/view_catalog.cpp32
-rw-r--r--src/mongo/db/views/view_catalog.h6
-rw-r--r--src/mongo/db/views/view_catalog_test.cpp5
-rw-r--r--src/mongo/db/views/view_definition_test.cpp37
-rw-r--r--src/mongo/db/views/view_graph_test.cpp2
27 files changed, 1189 insertions, 431 deletions
diff --git a/jstests/core/time_series/time_series_bucket_limit_count.js b/jstests/core/time_series/time_series_bucket_limit_count.js
index feddbf4e200..b8a79e39f04 100644
--- a/jstests/core/time_series/time_series_bucket_limit_count.js
+++ b/jstests/core/time_series/time_series_bucket_limit_count.js
@@ -19,74 +19,74 @@ if (!TimeseriesTest.timeseriesCollectionsEnabled(db.getMongo())) {
const testDB = db.getSiblingDB(jsTestName());
assert.commandWorked(testDB.dropDatabase());
-const coll = testDB.getCollection('t');
-const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
-
-coll.drop();
+// Assumes each bucket has a limit of 1000 measurements.
+const bucketMaxCount = 1000;
+const numDocs = bucketMaxCount + 100;
const timeFieldName = 'time';
-assert.commandWorked(
- testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
-assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
-const controlVersion = 1;
+const runTest = function(numDocsPerInsert) {
+ const coll = testDB.getCollection('t_' + numDocsPerInsert);
+ const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
+ coll.drop();
-// Assumes each bucket has a limit of 1000 measurements.
-const bucketMaxCount = 1000;
-const numDocs = bucketMaxCount + 100;
+ assert.commandWorked(
+ testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
+ assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
-for (let i = 0; i < numDocs; i++) {
- const t = ISODate();
- const doc = {_id: i, [timeFieldName]: t, x: i};
+ let docs = [];
+ for (let i = 0; i < numDocs; i++) {
+ docs.push({_id: i, [timeFieldName]: ISODate(), x: i});
+ if ((i + 1) % numDocsPerInsert === 0) {
+ assert.commandWorked(coll.insert(docs), 'failed to insert docs: ' + tojson(docs));
+ docs = [];
+ }
+ }
- assert.commandWorked(coll.insert(doc), 'failed to insert doc: ' + i + ': ' + tojson(doc));
-}
+ // Check view.
+ const viewDocs = coll.find({}, {x: 1}).sort({_id: 1}).toArray();
+ assert.eq(numDocs, viewDocs.length, viewDocs);
+ for (let i = 0; i < numDocs; i++) {
+ const viewDoc = viewDocs[i];
+ assert.eq(i, viewDoc._id, 'unexpected _id in doc: ' + i + ': ' + tojson(viewDoc));
+ assert.eq(i, viewDoc.x, 'unexpected field x in doc: ' + i + ': ' + tojson(viewDoc));
+ }
-// Check view.
-const viewDocs = coll.find({}, {x: 1}).sort({_id: 1}).toArray();
-assert.eq(numDocs, viewDocs.length, viewDocs);
-for (let i = 0; i < numDocs; i++) {
- const viewDoc = viewDocs[i];
- assert.eq(i, viewDoc._id, 'unexpected _id in doc: ' + i + ': ' + tojson(viewDoc));
- assert.eq(i, viewDoc.x, 'unexpected field x in doc: ' + i + ': ' + tojson(viewDoc));
-}
+ // Check bucket collection.
+ const bucketDocs = bucketsColl.find().sort({_id: 1}).toArray();
+ assert.eq(2, bucketDocs.length, bucketDocs);
-// Check bucket collection.
-const bucketDocs = bucketsColl.find().sort({_id: 1}).toArray();
-assert.eq(2, bucketDocs.length, bucketDocs);
+ // Check both buckets.
+ // First bucket should be full with 'bucketMaxCount' documents.
+ assert.eq(0,
+ bucketDocs[0].control.min._id,
+ 'invalid control.min for _id in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(0,
+ bucketDocs[0].control.min.x,
+ 'invalid control.min for x in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(bucketMaxCount - 1,
+ bucketDocs[0].control.max._id,
+ 'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(bucketMaxCount - 1,
+ bucketDocs[0].control.max.x,
+ 'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
-// Check both buckets.
-// First bucket should be full with 'bucketMaxCount' documents.
-assert.eq(bucketMaxCount,
- bucketDocs[0].control.count,
- 'invalid count in first bucket: ' + tojson(bucketDocs[0]));
-assert.eq(0,
- bucketDocs[0].control.min._id,
- 'invalid control.min for _id in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(0,
- bucketDocs[0].control.min.x,
- 'invalid control.min for x in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(bucketMaxCount - 1,
- bucketDocs[0].control.max._id,
- 'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(bucketMaxCount - 1,
- bucketDocs[0].control.max.x,
- 'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
+ // Second bucket should contain the remaining documents.
+ assert.eq(bucketMaxCount,
+ bucketDocs[1].control.min._id,
+ 'invalid control.min for _id in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(bucketMaxCount,
+ bucketDocs[1].control.min.x,
+ 'invalid control.min for x in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(numDocs - 1,
+ bucketDocs[1].control.max._id,
+ 'invalid control.max for _id in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(numDocs - 1,
+ bucketDocs[1].control.max.x,
+ 'invalid control.max for x in second bucket: ' + tojson(bucketDocs[1].control));
+};
-// Second bucket should contain the remaining documents.
-assert.eq(numDocs - bucketMaxCount,
- bucketDocs[1].control.count,
- 'invalid count in second bucket: ' + tojson(bucketDocs[1]));
-assert.eq(bucketMaxCount,
- bucketDocs[1].control.min._id,
- 'invalid control.min for _id in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(bucketMaxCount,
- bucketDocs[1].control.min.x,
- 'invalid control.min for x in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(numDocs - 1,
- bucketDocs[1].control.max._id,
- 'invalid control.max for _id in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(numDocs - 1,
- bucketDocs[1].control.max.x,
- 'invalid control.max for x in second bucket: ' + tojson(bucketDocs[1].control));
+runTest(1);
+runTest(numDocs / 2);
+runTest(numDocs);
})();
diff --git a/jstests/core/time_series/time_series_bucket_limit_size.js b/jstests/core/time_series/time_series_bucket_limit_size.js
index 86cde463db2..d487f78c15a 100644
--- a/jstests/core/time_series/time_series_bucket_limit_size.js
+++ b/jstests/core/time_series/time_series_bucket_limit_size.js
@@ -1,9 +1,9 @@
/**
* Tests maximum size of measurements held in each bucket in a time-series buckets collection.
* @tags: [
+ * does_not_support_stepdowns,
* requires_fcv_49,
* requires_find_command,
- * requires_getmore,
* ]
*/
(function() {
@@ -19,17 +19,7 @@ if (!TimeseriesTest.timeseriesCollectionsEnabled(db.getMongo())) {
const testDB = db.getSiblingDB(jsTestName());
assert.commandWorked(testDB.dropDatabase());
-const coll = testDB.getCollection('t');
-const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
-
-coll.drop();
-
const timeFieldName = 'time';
-assert.commandWorked(
- testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
-assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
-
-const controlVersion = 1;
// Assumes each bucket has a limit of 125kB on the measurements stored in the 'data' field.
const bucketMaxSizeKB = 125;
@@ -39,57 +29,69 @@ const numDocs = 2;
// to leave a little room for the _id and the time fields.
const largeValue = 'x'.repeat((bucketMaxSizeKB - 1) * 1024);
-for (let i = 0; i < numDocs; i++) {
- const t = ISODate();
- const doc = {_id: i, [timeFieldName]: t, x: largeValue};
+const runTest = function(numDocsPerInsert) {
+ const coll = testDB.getCollection('t_' + numDocsPerInsert);
+ const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
+ coll.drop();
- assert.commandWorked(coll.insert(doc), 'failed to insert doc: ' + i + ': ' + tojson(doc));
-}
+ assert.commandWorked(
+ testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
+ assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
-// Check view.
-const viewDocs = coll.find({}, {x: 1}).sort({_id: 1}).toArray();
-assert.eq(numDocs, viewDocs.length, viewDocs);
-for (let i = 0; i < numDocs; i++) {
- const viewDoc = viewDocs[i];
- assert.eq(i, viewDoc._id, 'unexpected _id in doc: ' + i + ': ' + tojson(viewDoc));
- assert.eq(largeValue, viewDoc.x, 'unexpected field x in doc: ' + i + ': ' + tojson(viewDoc));
-}
+ let docs = [];
+ for (let i = 0; i < numDocs; i++) {
+ docs.push({_id: i, [timeFieldName]: ISODate(), x: largeValue});
+ if ((i + 1) % numDocsPerInsert === 0) {
+ assert.commandWorked(coll.insert(docs), 'failed to insert docs: ' + tojson(docs));
+ docs = [];
+ }
+ }
+
+ // Check view.
+ const viewDocs = coll.find({}, {x: 1}).sort({_id: 1}).toArray();
+ assert.eq(numDocs, viewDocs.length, viewDocs);
+ for (let i = 0; i < numDocs; i++) {
+ const viewDoc = viewDocs[i];
+ assert.eq(i, viewDoc._id, 'unexpected _id in doc: ' + i + ': ' + tojson(viewDoc));
+ assert.eq(
+ largeValue, viewDoc.x, 'unexpected field x in doc: ' + i + ': ' + tojson(viewDoc));
+ }
+
+ // Check bucket collection.
+ const bucketDocs = bucketsColl.find().sort({_id: 1}).toArray();
+ assert.eq(2, bucketDocs.length, bucketDocs);
-// Check bucket collection.
-const bucketDocs = bucketsColl.find().sort({_id: 1}).toArray();
-assert.eq(2, bucketDocs.length, bucketDocs);
+ // Check both buckets.
+ // First bucket should be full with one document since we spill the second document over into
+ // the second bucket due to size constraints on 'data'.
+ assert.eq(0,
+ bucketDocs[0].control.min._id,
+ 'invalid control.min for _id in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(largeValue,
+ bucketDocs[0].control.min.x,
+ 'invalid control.min for x in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(0,
+ bucketDocs[0].control.max._id,
+ 'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(largeValue,
+ bucketDocs[0].control.max.x,
+ 'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
-// Check both buckets.
-// First bucket should be full with one document since we spill the second document over into the
-// second bucket due to size constraints on 'data'.
-assert.eq(
- 1, bucketDocs[0].control.count, 'invalid count in first bucket: ' + tojson(bucketDocs[0]));
-assert.eq(0,
- bucketDocs[0].control.min._id,
- 'invalid control.min for _id in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(largeValue,
- bucketDocs[0].control.min.x,
- 'invalid control.min for x in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(0,
- bucketDocs[0].control.max._id,
- 'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(largeValue,
- bucketDocs[0].control.max.x,
- 'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
+ // Second bucket should contain the remaining document.
+ assert.eq(numDocs - 1,
+ bucketDocs[1].control.min._id,
+ 'invalid control.min for _id in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(largeValue,
+ bucketDocs[1].control.min.x,
+ 'invalid control.min for x in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(numDocs - 1,
+ bucketDocs[1].control.max._id,
+ 'invalid control.max for _id in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(largeValue,
+ bucketDocs[1].control.max.x,
+ 'invalid control.max for x in second bucket: ' + tojson(bucketDocs[1].control));
+};
-// Second bucket should contain the remaining document.
-assert.eq(
- 1, bucketDocs[1].control.count, 'invalid count in second bucket: ' + tojson(bucketDocs[1]));
-assert.eq(numDocs - 1,
- bucketDocs[1].control.min._id,
- 'invalid control.min for _id in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(largeValue,
- bucketDocs[1].control.min.x,
- 'invalid control.min for x in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(numDocs - 1,
- bucketDocs[1].control.max._id,
- 'invalid control.max for _id in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(largeValue,
- bucketDocs[1].control.max.x,
- 'invalid control.max for x in second bucket: ' + tojson(bucketDocs[1].control));
+runTest(1);
+runTest(numDocs);
})();
diff --git a/jstests/core/time_series/time_series_bucket_limit_time_range.js b/jstests/core/time_series/time_series_bucket_limit_time_range.js
index 387a31ac258..1824ed290c9 100644
--- a/jstests/core/time_series/time_series_bucket_limit_time_range.js
+++ b/jstests/core/time_series/time_series_bucket_limit_time_range.js
@@ -1,9 +1,9 @@
/**
* Tests maximum time-range of measurements held in each bucket in a time-series buckets collection.
* @tags: [
+ * does_not_support_stepdowns,
* requires_fcv_49,
* requires_find_command,
- * requires_getmore,
* ]
*/
(function() {
@@ -19,90 +19,91 @@ if (!TimeseriesTest.timeseriesCollectionsEnabled(db.getMongo())) {
const testDB = db.getSiblingDB(jsTestName());
assert.commandWorked(testDB.dropDatabase());
-const coll = testDB.getCollection('t');
-const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
-
-coll.drop();
-
const timeFieldName = 'time';
-assert.commandWorked(
- testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
-assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
-
-const controlVersion = 1;
// Assumes the measurements in each bucket span at most one hour (based on the time field).
-const bucketMaxTimeRangeHours = 1;
-const numDocs = 2;
-const now = ISODate();
-// Measurements should be more than 'bucketMaxTimeRangeHours' apart.
const docTimes = [ISODate("2020-11-13T01:00:00Z"), ISODate("2020-11-13T03:00:00Z")];
+const numDocs = 2;
-for (let i = 0; i < numDocs; i++) {
- const doc = {_id: i, [timeFieldName]: docTimes[i], x: i};
+const runTest = function(numDocsPerInsert) {
+ const coll = testDB.getCollection('t_' + numDocsPerInsert);
+ const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
+ coll.drop();
- assert.commandWorked(coll.insert(doc), 'failed to insert doc: ' + i + ': ' + tojson(doc));
-}
+ assert.commandWorked(
+ testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
+ assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
-// Check view.
-const viewDocs = coll.find().sort({_id: 1}).toArray();
-assert.eq(numDocs, viewDocs.length, viewDocs);
-for (let i = 0; i < numDocs; i++) {
- const viewDoc = viewDocs[i];
- assert.eq(i, viewDoc._id, 'unexpected _id in doc: ' + i + ': ' + tojson(viewDoc));
- assert.eq(i, viewDoc.x, 'unexpected field x in doc: ' + i + ': ' + tojson(viewDoc));
- assert.eq(docTimes[i],
- viewDoc[timeFieldName],
- 'unexpected time in doc: ' + i + ': ' + tojson(viewDoc));
-}
+ let docs = [];
+ for (let i = 0; i < numDocs; i++) {
+ docs.push({_id: i, [timeFieldName]: docTimes[i], x: i});
+ if ((i + 1) % numDocsPerInsert === 0) {
+ assert.commandWorked(coll.insert(docs), 'failed to insert docs: ' + tojson(docs));
+ docs = [];
+ }
+ }
+
+ // Check view.
+ const viewDocs = coll.find().sort({_id: 1}).toArray();
+ assert.eq(numDocs, viewDocs.length, viewDocs);
+ for (let i = 0; i < numDocs; i++) {
+ const viewDoc = viewDocs[i];
+ assert.eq(i, viewDoc._id, 'unexpected _id in doc: ' + i + ': ' + tojson(viewDoc));
+ assert.eq(i, viewDoc.x, 'unexpected field x in doc: ' + i + ': ' + tojson(viewDoc));
+ assert.eq(docTimes[i],
+ viewDoc[timeFieldName],
+ 'unexpected time in doc: ' + i + ': ' + tojson(viewDoc));
+ }
+
+ // Check bucket collection.
+ const bucketDocs = bucketsColl.find().sort({_id: 1}).toArray();
+ assert.eq(2, bucketDocs.length, bucketDocs);
+
+ // Check both buckets.
+ // First bucket should be not contain both documents because the time of the second measurement
+ // is ahead of the first document by more than 'bucketMaxTimeRangeHours'.
+ assert.eq(0,
+ bucketDocs[0].control.min._id,
+ 'invalid control.min for _id in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(0,
+ bucketDocs[0].control.min.x,
+ 'invalid control.min for x in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(docTimes[0],
+ bucketDocs[0].control.min[timeFieldName],
+ 'invalid control.min for time in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(0,
+ bucketDocs[0].control.max._id,
+ 'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(0,
+ bucketDocs[0].control.max.x,
+ 'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(docTimes[0],
+ bucketDocs[0].control.max[timeFieldName],
+ 'invalid control.max for time in first bucket: ' + tojson(bucketDocs[0].control));
-// Check bucket collection.
-const bucketDocs = bucketsColl.find().sort({_id: 1}).toArray();
-assert.eq(2, bucketDocs.length, bucketDocs);
+ // Second bucket should contain the remaining document.
+ assert.eq(numDocs - 1,
+ bucketDocs[1].control.min._id,
+ 'invalid control.min for _id in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(numDocs - 1,
+ bucketDocs[1].control.min.x,
+ 'invalid control.min for x in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(docTimes[numDocs - 1],
+ bucketDocs[1].control.min[timeFieldName],
+ 'invalid control.min for time in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(numDocs - 1,
+ bucketDocs[1].control.max._id,
+ 'invalid control.max for _id in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(numDocs - 1,
+ bucketDocs[1].control.max.x,
+ 'invalid control.max for x in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(docTimes[numDocs - 1],
+ bucketDocs[1].control.max[timeFieldName],
+ 'invalid control.max for time in second bucket: ' + tojson(bucketDocs[1].control));
-// Check both buckets.
-// First bucket should be not contain both documents because the time of the second measurement is
-// ahead of the first document by more than 'bucketMaxTimeRangeHours'.
-assert.eq(
- 1, bucketDocs[0].control.count, 'invalid count in first bucket: ' + tojson(bucketDocs[0]));
-assert.eq(0,
- bucketDocs[0].control.min._id,
- 'invalid control.min for _id in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(0,
- bucketDocs[0].control.min.x,
- 'invalid control.min for x in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(docTimes[0],
- bucketDocs[0].control.min[timeFieldName],
- 'invalid control.min for time in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(0,
- bucketDocs[0].control.max._id,
- 'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(0,
- bucketDocs[0].control.max.x,
- 'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
-assert.eq(docTimes[0],
- bucketDocs[0].control.max[timeFieldName],
- 'invalid control.max for time in first bucket: ' + tojson(bucketDocs[0].control));
+ assert(coll.drop());
+};
-// Second bucket should contain the remaining document.
-assert.eq(
- 1, bucketDocs[1].control.count, 'invalid count in second bucket: ' + tojson(bucketDocs[1]));
-assert.eq(numDocs - 1,
- bucketDocs[1].control.min._id,
- 'invalid control.min for _id in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(numDocs - 1,
- bucketDocs[1].control.min.x,
- 'invalid control.min for x in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(docTimes[numDocs - 1],
- bucketDocs[1].control.min[timeFieldName],
- 'invalid control.min for time in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(numDocs - 1,
- bucketDocs[1].control.max._id,
- 'invalid control.max for _id in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(numDocs - 1,
- bucketDocs[1].control.max.x,
- 'invalid control.max for x in second bucket: ' + tojson(bucketDocs[1].control));
-assert.eq(docTimes[numDocs - 1],
- bucketDocs[1].control.max[timeFieldName],
- 'invalid control.max for time in second bucket: ' + tojson(bucketDocs[1].control));
+runTest(1);
+runTest(numDocs);
})();
diff --git a/jstests/core/time_series/time_series_simple.js b/jstests/core/time_series/time_series_simple.js
index 707812d018b..ee8466c3a6e 100644
--- a/jstests/core/time_series/time_series_simple.js
+++ b/jstests/core/time_series/time_series_simple.js
@@ -2,6 +2,7 @@
* Tests inserting sample data into the time-series buckets collection.
* This test is for the simple case of only one measurement per bucket.
* @tags: [
+ * does_not_support_stepdowns,
* requires_fcv_49,
* ]
*/
@@ -102,7 +103,6 @@ const bucketDocs = bucketsColl.find().toArray();
assert.eq(1, bucketDocs.length, bucketDocs);
const bucketDoc = bucketDocs[0];
jsTestLog('Bucket collection document: ' + tojson(bucketDoc));
-assert.eq(numDocs, bucketDoc.control.count, 'invalid count in bucket: ' + tojson(bucketDoc));
assert.docEq(expectedBucketDoc.control.min,
bucketDoc.control.min,
'invalid min in bucket: ' + tojson(bucketDoc));
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 3a45337f310..31d9b2a99a4 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -40,6 +40,7 @@ env.SConscript(
'sorter',
'stats',
'storage',
+ 'timeseries',
'update',
'views',
],
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 2e7152c8a7d..19f23c6310d 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -438,6 +438,7 @@ env.Library(
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/server_options_core',
+ '$BUILD_DIR/mongo/db/timeseries/bucket_catalog',
'$BUILD_DIR/mongo/db/views/views',
'$BUILD_DIR/mongo/db/write_ops',
'collection',
diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp
index a69f55633b2..ef867c99730 100644
--- a/src/mongo/db/catalog/create_collection.cpp
+++ b/src/mongo/db/catalog/create_collection.cpp
@@ -206,7 +206,7 @@ Status _createTimeseries(OperationContext* opCtx,
// If the buckets collection and time-series view creation roll back, ensure that their Top
// entries are deleted.
opCtx->recoveryUnit()->onRollback(
- [serviceContext = opCtx->getServiceContext(), &ns, &bucketsNs]() {
+ [serviceContext = opCtx->getServiceContext(), ns, bucketsNs]() {
Top::get(serviceContext).collectionDropped(ns);
Top::get(serviceContext).collectionDropped(bucketsNs);
});
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp
index 8167dd2c1a9..6ebf6dc207b 100644
--- a/src/mongo/db/catalog/database_impl.cpp
+++ b/src/mongo/db/catalog/database_impl.cpp
@@ -619,7 +619,7 @@ Status DatabaseImpl::createView(OperationContext* opCtx,
str::stream() << "invalid namespace name for a view: " + viewName.toString()};
} else {
status = ViewCatalog::get(this)->createView(
- opCtx, viewName, viewOnNss, pipeline, options.collation);
+ opCtx, viewName, viewOnNss, pipeline, options.collation, options.timeseries);
}
audit::logCreateView(&cc(), viewName.toString(), viewOnNss.toString(), pipeline, status.code());
diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp
index 2642b81bc50..21264f369cf 100644
--- a/src/mongo/db/catalog/drop_collection.cpp
+++ b/src/mongo/db/catalog/drop_collection.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/timeseries/bucket_catalog.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
@@ -70,7 +71,8 @@ Status _checkNssAndReplState(OperationContext* opCtx, const CollectionPtr& coll)
Status _dropView(OperationContext* opCtx,
Database* db,
const NamespaceString& collectionName,
- BSONObjBuilder* result) {
+ BSONObjBuilder* result,
+ bool clearBucketCatalog = false) {
if (!db) {
return Status(ErrorCodes::NamespaceNotFound, "ns not found");
}
@@ -114,6 +116,10 @@ Status _dropView(OperationContext* opCtx,
}
wunit.commit();
+ if (clearBucketCatalog) {
+ BucketCatalog::get(opCtx).clear(collectionName);
+ }
+
result->append("ns", collectionName.ns());
return Status::OK();
}
@@ -322,7 +328,7 @@ Status dropCollection(OperationContext* opCtx,
return Status(ErrorCodes::NamespaceNotFound, "ns not found");
}
- if (!view->isTimeseries()) {
+ if (!view->timeseries()) {
return _dropView(opCtx, db, collectionName, &result);
}
@@ -331,15 +337,14 @@ Status dropCollection(OperationContext* opCtx,
std::move(autoDb),
view->viewOn(),
[opCtx, &collectionName, &result](Database* db, const NamespaceString& bucketsNs) {
- WriteUnitOfWork wuow(opCtx);
- auto status = _dropView(opCtx, db, collectionName, &result);
+ auto status = _dropView(
+ opCtx, db, collectionName, &result, true /* clearBucketCatalog */);
if (!status.isOK()) {
return status;
}
- wuow.commit();
- // Drop the buckets collection in its own writeConflictRetry so that
- // if it throws a WCE, only the buckets collection drop is retried.
+ // Drop the buckets collection in its own writeConflictRetry so that if it
+ // throws a WCE, only the buckets collection drop is retried.
writeConflictRetry(opCtx, "drop", bucketsNs.ns(), [opCtx, db, &bucketsNs] {
WriteUnitOfWork wuow(opCtx);
db->dropCollectionEvenIfSystem(opCtx, bucketsNs).ignore();
diff --git a/src/mongo/db/catalog/drop_database.cpp b/src/mongo/db/catalog/drop_database.cpp
index 3a177f3f907..cbb84773fd6 100644
--- a/src/mongo/db/catalog/drop_database.cpp
+++ b/src/mongo/db/catalog/drop_database.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/timeseries/bucket_catalog.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/logv2/log.h"
#include "mongo/util/duration.h"
@@ -114,6 +115,8 @@ void _finishDropDatabase(OperationContext* opCtx,
databaseHolder->dropDb(opCtx, db);
dropPendingGuard.dismiss();
+ BucketCatalog::get(opCtx).clear(dbName);
+
LOGV2(20336,
"dropDatabase {dbName} - finished, dropped {numCollections} collection(s)",
"dropDatabase",
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 5f61a1dc421..c00b230299b 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -270,6 +270,9 @@ env.Library(
'create.idl',
'create_command_validation.cpp',
],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/timeseries/timeseries_idl',
+ ],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/idl/idl_parser',
@@ -337,6 +340,7 @@ env.Library(
'$BUILD_DIR/mongo/db/stats/server_read_concern_write_concern_metrics',
'$BUILD_DIR/mongo/db/storage/storage_engine_common',
"$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl",
+ '$BUILD_DIR/mongo/db/timeseries/bucket_catalog',
'$BUILD_DIR/mongo/db/transaction',
'$BUILD_DIR/mongo/db/views/views_mongod',
'$BUILD_DIR/mongo/idl/feature_flag',
diff --git a/src/mongo/db/commands/create.idl b/src/mongo/db/commands/create.idl
index 5c4fdfef520..8515c2cf023 100644
--- a/src/mongo/db/commands/create.idl
+++ b/src/mongo/db/commands/create.idl
@@ -33,6 +33,7 @@ global:
imports:
- "mongo/idl/basic_types.idl"
+ - "mongo/db/timeseries/timeseries.idl"
structs:
IndexOptionDefaults:
@@ -46,27 +47,6 @@ structs:
validator:
callback: create_command_validation::validateStorageEngineOptions
- TimeseriesOptions:
- description: "The options that define a time-series collection."
- strict: true
- fields:
- timeField:
- description: "The name of the field to be used for time. Inserted documents must
- have this field, and the field must be of the BSON UTC datetime type
- (0x9)"
- type: string
- metaField:
- description: "The name of the field describing the series. This field is used to
- group related data and may be of any BSON type. This may not be
- \"_id\" or the same as 'timeField'."
- type: string
- optional: true
- expireAfterSeconds:
- description: "The number of seconds after which old time-series data should be
- deleted."
- type: long
- optional: true
-
commands:
create:
description: "Parser for the 'create' Command"
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
index bca6a26e20b..92eb7526073 100644
--- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
+++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
@@ -264,7 +264,6 @@ public:
} else {
// Time-series collections are only supported in 5.0. If the user tries to downgrade the
// cluster to an earlier version, they must first remove all time-series collections.
- // TODO(SERVER-52523): Use the bucket catalog to detect time-series collections.
for (const auto& dbName : DatabaseHolder::get(opCtx)->getNames()) {
auto viewCatalog = DatabaseHolder::get(opCtx)->getSharedViewCatalog(opCtx, dbName);
if (!viewCatalog) {
@@ -277,7 +276,7 @@ public:
"collections present; drop all time-series collections before "
"downgrading. First detected time-series collection: "
<< view.name(),
- !view.isTimeseries());
+ !view.timeseries());
});
}
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index 8b53fe99b84..a17dd90b683 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -59,6 +59,7 @@
#include "mongo/db/repl/tenant_migration_conflict_info.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/duplicate_key_error_info.h"
+#include "mongo/db/timeseries/bucket_catalog.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/stale_exception.h"
@@ -106,81 +107,80 @@ bool isTimeseries(OperationContext* opCtx, const NamespaceString& ns) {
return false;
}
- return view->isTimeseries();
+ return view->timeseries().has_value();
}
// Default for control.version in time-series bucket collection.
const int kTimeseriesControlVersion = 1;
-const int kTimeseriesBucketMaxCount = 1000;
-const int kTimeseriesBucketMaxSizeKB = 125;
-const Hours kTimeseriesBucketMaxTimeRange(1);
/**
* Returns min/max $set expressions for the bucket's control field.
*/
-BSONObj makeTimeseriesControlMinMaxStages(const BSONObj& doc) {
+BSONObj makeTimeseriesControlMinMaxStages(const std::vector<BSONObj>& docs) {
+ struct MinMaxBuilders {
+ BSONArrayBuilder min;
+ BSONArrayBuilder max;
+ };
+ StringDataMap<MinMaxBuilders> minMaxBuilders;
+
+ for (const auto& doc : docs) {
+ for (const auto& elem : doc) {
+ auto key = elem.fieldNameStringData();
+ auto [it, created] = minMaxBuilders.insert({key, MinMaxBuilders{}});
+ if (created) {
+ it->second.min.append("$control.min." + key);
+ it->second.max.append("$control.max." + key);
+ }
+ it->second.min.append(elem);
+ it->second.max.append(elem);
+ }
+ }
+
BSONObjBuilder builder;
- for (const auto& elem : doc) {
- auto key = elem.fieldNameStringData();
- builder.append("control.min." + key,
- BSON("$min" << BSON_ARRAY(("$control.min." + key) << elem)));
- builder.append("control.max." + key,
- BSON("$max" << BSON_ARRAY(("$control.max." + key) << elem)));
+ for (auto& builders : minMaxBuilders) {
+ builder.append("control.min." + builders.first, BSON("$min" << builders.second.min.arr()));
+ builder.append("control.max." + builders.first, BSON("$max" << builders.second.max.arr()));
}
+
return builder.obj();
}
/**
* Returns $set expressions for the bucket's data field.
*/
-BSONObj makeTimeseriesDataStages(const BSONObj& doc) {
+BSONObj makeTimeseriesDataStages(const std::vector<BSONObj>& docs, uint16_t count) {
+ StringDataMap<BSONArrayBuilder> measurements;
+ for (const auto& doc : docs) {
+ for (const auto& elem : doc) {
+ auto key = elem.fieldNameStringData();
+ measurements[key].append(
+ BSON("k" << std::to_string(count) << elem.wrap("v").firstElement()));
+ }
+ count++;
+ }
+
BSONObjBuilder builder;
- for (const auto& elem : doc) {
- auto key = elem.fieldNameStringData();
+ for (auto& field : measurements) {
builder.append(
- "data." + key,
+ "data." + field.first,
BSON("$arrayToObject" << BSON(
"$setUnion" << BSON_ARRAY(
- BSON("$objectToArray"
- << BSON("$ifNull" << BSON_ARRAY(("$data." + key) << BSONObj())))
- << BSON_ARRAY(BSON(
- "k" << BSON("$toString"
- << BSON("$ifNull" << BSON_ARRAY("$control.count" << 0)))
- << elem.wrap("v").firstElement()))))));
+ BSON("$objectToArray" << BSON(
+ "$ifNull" << BSON_ARRAY(("$data." + field.first) << BSONObj())))
+ << field.second.arr()))));
}
+
return builder.obj();
}
/**
* Transforms a single time-series insert to an upsert request.
*/
-BSONObj makeTimeseriesUpsertRequest(const BSONObj& doc) {
+BSONObj makeTimeseriesUpsertRequest(const OID& oid,
+ const std::vector<BSONObj>& docs,
+ uint16_t count) {
BSONObjBuilder builder;
- // TODO(SERVER-52523): Obtain _id of bucket to update and name of time field from in-memory
- // catalog.
- const auto timeField = "time"_sd;
- {
- BSONObjBuilder queryBuilder(builder.subobjStart(write_ops::UpdateOpEntry::kQFieldName));
- BSONArrayBuilder andBuilder(queryBuilder.subarrayStart("$and"));
- // Each bucket can hold up to 'kTimeseriesBucketMaxCount' measurements.
- andBuilder.append(BSON(std::string(str::stream() << "data." << timeField << "."
- << (kTimeseriesBucketMaxCount - 1))
- << BSON("$exists" << false)));
- // The total size of measurements in a bucket cannot exceed 'kTimeseriesBucketMaxSizeKB'.
- // Ideally, we would use the following expression to avoid relying on 'control.size':
- // {$expr: {$lte: [{$bsonSize: '$data'}, (bucketMaxSizeKB * 1024 - doc.objsize())]}}
- // but $expr is not allowed in an upsert. See SERVER-30731.
- andBuilder.append(BSON(
- "control.size" << BSON("$lte" << (kTimeseriesBucketMaxSizeKB * 1024 - doc.objsize()))));
- // The maximum time-range of a bucket is limited, so index scans looking for buckets
- // containing a time T only need to consider buckets that are newer than
- // T - 'kTimeseriesBucketMaxTimeRange'.
- auto docTime = doc[timeField].Date();
- const std::string minTimeFieldName = str::stream() << "control.min." << timeField;
- andBuilder.append(BSON(minTimeFieldName << BSON("$lte" << docTime)));
- andBuilder.append(
- BSON(minTimeFieldName << BSON("$gte" << (docTime - kTimeseriesBucketMaxTimeRange))));
- }
+ builder.append(write_ops::UpdateOpEntry::kQFieldName, BSON("_id" << oid));
builder.append(write_ops::UpdateOpEntry::kMultiFieldName, false);
builder.append(write_ops::UpdateOpEntry::kUpsertFieldName, true);
{
@@ -190,23 +190,96 @@ BSONObj makeTimeseriesUpsertRequest(const BSONObj& doc) {
BSON("$set" << BSON("control.version"
<< BSON("$ifNull" << BSON_ARRAY("$control.version"
<< kTimeseriesControlVersion)))));
- stagesBuilder.append(BSON(
- "$set" << BSON("control.size"
- << BSON("$sum"
- << BSON_ARRAY(BSON("$ifNull" << BSON_ARRAY("$control.size" << 0))
- << doc.objsize())))));
- stagesBuilder.append(BSON("$set" << makeTimeseriesControlMinMaxStages(doc)));
- stagesBuilder.append(BSON("$set" << makeTimeseriesDataStages(doc)));
- // Update 'control.count' last because it is referenced in preceding $set stages in this
- // aggregation pipeline.
- stagesBuilder.append(BSON(
- "$set" << BSON("control.count" << BSON(
- "$sum" << BSON_ARRAY(
- BSON("$ifNull" << BSON_ARRAY("$control.count" << 0)) << 1)))));
+ stagesBuilder.append(BSON("$set" << makeTimeseriesControlMinMaxStages(docs)));
+ stagesBuilder.append(BSON("$set" << makeTimeseriesDataStages(docs, count)));
}
return builder.obj();
}
+void appendOpTime(const repl::OpTime& opTime, BSONObjBuilder* out) {
+ if (opTime.getTerm() == repl::OpTime::kUninitializedTerm) {
+ out->append("opTime", opTime.getTimestamp());
+ } else {
+ opTime.append(out, "opTime");
+ }
+}
+
+boost::optional<BSONObj> generateError(OperationContext* opCtx,
+ const StatusWith<SingleWriteResult>& result,
+ int index,
+ size_t numErrors) {
+ auto status = result.getStatus();
+ if (status.isOK()) {
+ return boost::none;
+ }
+
+ auto errorMessage = [numErrors, errorSize = size_t(0)](StringData rawMessage) mutable {
+ // Start truncating error messages once both of these limits are exceeded.
+ constexpr size_t kErrorSizeTruncationMin = 1024 * 1024;
+ constexpr size_t kErrorCountTruncationMin = 2;
+ if (errorSize >= kErrorSizeTruncationMin && numErrors >= kErrorCountTruncationMin) {
+ return ""_sd;
+ }
+
+ errorSize += rawMessage.size();
+ return rawMessage;
+ };
+
+ BSONSizeTracker errorsSizeTracker;
+ BSONObjBuilder error(errorsSizeTracker);
+ error.append("index", index);
+ if (auto staleInfo = status.extraInfo<StaleConfigInfo>()) {
+ error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception!
+ {
+ BSONObjBuilder errInfo(error.subobjStart("errInfo"));
+ staleInfo->serialize(&errInfo);
+ }
+ } else if (ErrorCodes::DocumentValidationFailure == status.code() && status.extraInfo()) {
+ auto docValidationError =
+ status.extraInfo<doc_validation_error::DocumentValidationFailureInfo>();
+ error.append("code", static_cast<int>(ErrorCodes::DocumentValidationFailure));
+ error.append("errInfo", docValidationError->getDetails());
+ } else if (ErrorCodes::isTenantMigrationError(status.code())) {
+ if (ErrorCodes::TenantMigrationConflict == status.code()) {
+ auto migrationConflictInfo = status.extraInfo<TenantMigrationConflictInfo>();
+
+ hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx);
+
+ auto mtab = migrationConflictInfo->getTenantMigrationAccessBlocker();
+
+ auto migrationStatus = mtab->waitUntilCommittedOrAborted(opCtx);
+ error.append("code", static_cast<int>(migrationStatus.code()));
+
+ // We want to append an empty errmsg for the errors after the first one, so let the
+ // code below that appends errmsg do that.
+ if (status.reason() != "") {
+ error.append("errmsg", errorMessage(migrationStatus.reason()));
+ }
+ if (migrationStatus.extraInfo()) {
+ error.append("errInfo",
+ migrationStatus.extraInfo<TenantMigrationCommittedInfo>()->toBSON());
+ }
+ } else {
+ error.append("code", int(status.code()));
+ if (status.extraInfo()) {
+ error.append("errInfo", status.extraInfo<TenantMigrationCommittedInfo>()->toBSON());
+ }
+ }
+ } else {
+ error.append("code", int(status.code()));
+ if (auto const extraInfo = status.extraInfo()) {
+ extraInfo->serialize(&error);
+ }
+ }
+
+ // Skip appending errmsg if it has already been appended like in the case of
+ // TenantMigrationConflict.
+ if (!error.hasField("errmsg")) {
+ error.append("errmsg", errorMessage(status.reason()));
+ }
+ return error.obj();
+}
+
enum class ReplyStyle { kUpdate, kNotUpdate }; // update has extra fields.
void serializeReply(OperationContext* opCtx,
ReplyStyle replyStyle,
@@ -238,91 +311,24 @@ void serializeReply(OperationContext* opCtx,
std::vector<BSONObj> upsertInfo;
std::vector<BSONObj> errors;
BSONSizeTracker upsertInfoSizeTracker;
- BSONSizeTracker errorsSizeTracker;
-
- auto errorMessage = [&, errorSize = size_t(0)](StringData rawMessage) mutable {
- // Start truncating error messages once both of these limits are exceeded.
- constexpr size_t kErrorSizeTruncationMin = 1024 * 1024;
- constexpr size_t kErrorCountTruncationMin = 2;
- if (errorSize >= kErrorSizeTruncationMin && errors.size() >= kErrorCountTruncationMin) {
- return ""_sd;
- }
-
- errorSize += rawMessage.size();
- return rawMessage;
- };
for (size_t i = 0; i < result.results.size(); i++) {
- if (result.results[i].isOK()) {
- const auto& opResult = result.results[i].getValue();
- nVal += opResult.getN(); // Always there.
- if (replyStyle == ReplyStyle::kUpdate) {
- nModified += opResult.getNModified();
- if (auto idElement = opResult.getUpsertedId().firstElement()) {
- BSONObjBuilder upsertedId(upsertInfoSizeTracker);
- upsertedId.append("index", int(i));
- upsertedId.appendAs(idElement, "_id");
- upsertInfo.push_back(upsertedId.obj());
- }
- }
+ if (auto error = generateError(opCtx, result.results[i], i, errors.size())) {
+ errors.push_back(*error);
continue;
}
- const auto& status = result.results[i].getStatus();
- BSONObjBuilder error(errorsSizeTracker);
- error.append("index", int(i));
- if (auto staleInfo = status.extraInfo<StaleConfigInfo>()) {
- error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception!
- {
- BSONObjBuilder errInfo(error.subobjStart("errInfo"));
- staleInfo->serialize(&errInfo);
- }
- } else if (ErrorCodes::DocumentValidationFailure == status.code() && status.extraInfo()) {
- auto docValidationError =
- status.extraInfo<doc_validation_error::DocumentValidationFailureInfo>();
- error.append("code", static_cast<int>(ErrorCodes::DocumentValidationFailure));
- error.append("errInfo", docValidationError->getDetails());
- } else if (ErrorCodes::isTenantMigrationError(status.code())) {
- if (ErrorCodes::TenantMigrationConflict == status.code()) {
- auto migrationConflictInfo = status.extraInfo<TenantMigrationConflictInfo>();
-
- hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx);
-
- auto mtab = migrationConflictInfo->getTenantMigrationAccessBlocker();
-
- auto migrationStatus = mtab->waitUntilCommittedOrAborted(opCtx);
- error.append("code", static_cast<int>(migrationStatus.code()));
-
- // We want to append an empty errmsg for the errors after the first one, so let the
- // code below that appends errmsg do that.
- if (status.reason() != "") {
- error.append("errmsg", errorMessage(migrationStatus.reason()));
- }
- if (migrationStatus.extraInfo()) {
- error.append(
- "errInfo",
- migrationStatus.extraInfo<TenantMigrationCommittedInfo>()->toBSON());
- }
- } else {
- error.append("code", int(status.code()));
- if (status.extraInfo()) {
- error.append("errInfo",
- status.extraInfo<TenantMigrationCommittedInfo>()->toBSON());
- }
+ const auto& opResult = result.results[i].getValue();
+ nVal += opResult.getN(); // Always there.
+ if (replyStyle == ReplyStyle::kUpdate) {
+ nModified += opResult.getNModified();
+ if (auto idElement = opResult.getUpsertedId().firstElement()) {
+ BSONObjBuilder upsertedId(upsertInfoSizeTracker);
+ upsertedId.append("index", int(i));
+ upsertedId.appendAs(idElement, "_id");
+ upsertInfo.push_back(upsertedId.obj());
}
- } else {
- error.append("code", int(status.code()));
- if (auto const extraInfo = status.extraInfo()) {
- extraInfo->serialize(&error);
- }
- }
-
- // Skip appending errmsg if it has already been appended like in the case of
- // TenantMigrationConflict.
- if (!error.hasField("errmsg")) {
- error.append("errmsg", errorMessage(status.reason()));
}
- errors.push_back(error.obj());
}
out->appendNumber("n", nVal);
@@ -345,12 +351,7 @@ void serializeReply(OperationContext* opCtx,
auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext());
const auto replMode = replCoord->getReplicationMode();
if (replMode != repl::ReplicationCoordinator::modeNone) {
- const auto lastOp = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- if (lastOp.getTerm() == repl::OpTime::kUninitializedTerm) {
- out->append("opTime", lastOp.getTimestamp());
- } else {
- lastOp.append(out, "opTime");
- }
+ appendOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), out);
if (replMode == repl::ReplicationCoordinator::modeReplSet) {
out->append("electionId", replCoord->getElectionId());
@@ -468,39 +469,102 @@ private:
/**
* Writes to the underlying system.buckets collection.
*/
- void _performTimeseriesWrites(OperationContext* opCtx, BSONObjBuilder& result) const {
+ void _performTimeseriesWrites(OperationContext* opCtx, BSONObjBuilder* result) const {
auto ns = _batch.getNamespace();
auto bucketsNs = ns.makeTimeseriesBucketsNamespace();
- BSONObjBuilder builder;
- builder.append(write_ops::Update::kCommandName, bucketsNs.coll());
- // The schema validation configured in the bucket collection is intended for direct
- // operations by end users and is not applicable here.
- builder.append(write_ops::Update::kBypassDocumentValidationFieldName, true);
- builder.append(write_ops::Update::kOrderedFieldName, _batch.getOrdered());
- if (auto stmtId = _batch.getStmtId()) {
- builder.append(write_ops::Update::kStmtIdFieldName, *stmtId);
- } else if (auto stmtIds = _batch.getStmtIds()) {
- builder.append(write_ops::Update::kStmtIdsFieldName, *stmtIds);
+ auto& bucketCatalog = BucketCatalog::get(opCtx);
+ std::vector<std::pair<OID, size_t>> bucketsToCommit;
+ std::vector<std::pair<Future<BucketCatalog::CommitInfo>, size_t>> bucketsToWaitOn;
+ for (size_t i = 0; i < _batch.getDocuments().size(); i++) {
+ auto [bucketId, commitInfo] =
+ bucketCatalog.insert(opCtx, ns, _batch.getDocuments()[i]);
+ if (commitInfo) {
+ bucketsToWaitOn.push_back({std::move(*commitInfo), i});
+ } else {
+ bucketsToCommit.push_back({std::move(bucketId), i});
+ }
}
- {
- BSONArrayBuilder updatesBuilder(
- builder.subarrayStart(write_ops::Update::kUpdatesFieldName));
- for (const auto& doc : _batch.getDocuments()) {
- updatesBuilder.append(makeTimeseriesUpsertRequest(doc));
+
+ std::vector<BSONObj> errors;
+ boost::optional<repl::OpTime> opTime;
+ boost::optional<OID> electionId;
+
+ for (const auto& [bucketId, index] : bucketsToCommit) {
+ auto data = bucketCatalog.commit(bucketId);
+ while (!data.docs.empty()) {
+ BSONObjBuilder builder;
+ builder.append(write_ops::Update::kCommandName, bucketsNs.coll());
+ // The schema validation configured in the bucket collection is intended for
+ // direct operations by end users and is not applicable here.
+ builder.append(write_ops::Update::kBypassDocumentValidationFieldName, true);
+ builder.append(write_ops::Update::kOrderedFieldName, _batch.getOrdered());
+ if (auto stmtId = _batch.getStmtId()) {
+ builder.append(write_ops::Update::kStmtIdFieldName, *stmtId);
+ } else if (auto stmtIds = _batch.getStmtIds()) {
+ builder.append(write_ops::Update::kStmtIdsFieldName, *stmtIds);
+ }
+
+ {
+ BSONArrayBuilder updatesBuilder(
+ builder.subarrayStart(write_ops::Update::kUpdatesFieldName));
+ updatesBuilder.append(makeTimeseriesUpsertRequest(
+ bucketId, data.docs, data.numCommittedMeasurements));
+ }
+
+ auto request = OpMsgRequest::fromDBAndBody(bucketsNs.db(), builder.obj());
+ auto timeseriesUpsertBatch = UpdateOp::parse(request);
+ auto reply = write_ops_exec::performUpdates(opCtx, timeseriesUpsertBatch);
+
+ invariant(reply.results.size() == 1,
+ str::stream()
+ << "Unexpected number of results (" << reply.results.size()
+ << ") for insert on time-series collection " << ns);
+
+ if (auto error = generateError(opCtx, reply.results[0], index, errors.size())) {
+ errors.push_back(*error);
+ }
+
+ auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext());
+ const auto replMode = replCoord->getReplicationMode();
+
+ opTime = replMode != repl::ReplicationCoordinator::modeNone
+ ? boost::make_optional(
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp())
+ : boost::none;
+ electionId = replMode == repl::ReplicationCoordinator::modeReplSet
+ ? boost::make_optional(replCoord->getElectionId())
+ : boost::none;
+
+ data = bucketCatalog.commit(
+ bucketId,
+ BucketCatalog::CommitInfo{std::move(reply.results[0]), opTime, electionId});
}
}
- auto request = OpMsgRequest::fromDBAndBody(bucketsNs.db(), builder.obj());
- auto timeseriesUpsertBatch = UpdateOp::parse(request);
+ for (const auto& [future, index] : bucketsToWaitOn) {
+ auto commitInfo = future.get(opCtx);
+ if (auto error = generateError(opCtx, commitInfo.result, index, errors.size())) {
+ errors.push_back(*error);
+ }
+ if (commitInfo.opTime) {
+ opTime = std::max(opTime.value_or(repl::OpTime()), *commitInfo.opTime);
+ }
+ if (commitInfo.electionId) {
+ electionId = std::max(electionId.value_or(OID()), *commitInfo.electionId);
+ }
+ }
- auto reply = write_ops_exec::performUpdates(opCtx, timeseriesUpsertBatch);
- serializeReply(opCtx,
- ReplyStyle::kUpdate,
- !timeseriesUpsertBatch.getWriteCommandBase().getOrdered(),
- timeseriesUpsertBatch.getUpdates().size(),
- std::move(reply),
- &result);
+ result->appendNumber("n", _batch.getDocuments().size() - errors.size());
+ if (!errors.empty()) {
+ result->append("writeErrors", errors);
+ }
+ if (opTime) {
+ appendOpTime(*opTime, result);
+ }
+ if (electionId) {
+ result->append("electionId", *electionId);
+ }
}
void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
@@ -508,7 +572,7 @@ private:
// Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's
// constructor.
try {
- _performTimeseriesWrites(opCtx, result);
+ _performTimeseriesWrites(opCtx, &result);
} catch (DBException& ex) {
ex.addContext(str::stream() << "time-series insert failed: " << ns().ns());
throw;
diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript
new file mode 100644
index 00000000000..dc89606b46b
--- /dev/null
+++ b/src/mongo/db/timeseries/SConscript
@@ -0,0 +1,38 @@
+# -*- mode: python -*-
+
+Import("env")
+
+env = env.Clone()
+
+env.Library(
+ target='timeseries_idl',
+ source=[
+ 'timeseries.idl',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/idl/idl_parser',
+ ],
+)
+
+env.Library(
+ target='bucket_catalog',
+ source=[
+ 'bucket_catalog.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/database_holder',
+ '$BUILD_DIR/mongo/db/views/views',
+ 'timeseries_idl',
+ ],
+)
+
+env.CppUnitTest(
+ target='bucket_catalog_test',
+ source=[
+ 'bucket_catalog_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/catalog/catalog_test_fixture',
+ 'bucket_catalog',
+ ],
+)
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
new file mode 100644
index 00000000000..5fde11b5772
--- /dev/null
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -0,0 +1,223 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/timeseries/bucket_catalog.h"
+
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/views/view_catalog.h"
+
+namespace mongo {
+namespace {
+const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>();
+
+const int kTimeseriesBucketMaxCount = 1000;
+const int kTimeseriesBucketMaxSizeBytes = 125 * 1024; // 125 KB
+const Hours kTimeseriesBucketMaxTimeRange(1);
+} // namespace
+
+BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) {
+ return getBucketCatalog(svcCtx);
+}
+
+BucketCatalog& BucketCatalog::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+BucketCatalog::InsertResult BucketCatalog::insert(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const BSONObj& doc) {
+ stdx::lock_guard lk(_mutex);
+
+ auto viewCatalog = DatabaseHolder::get(opCtx)->getSharedViewCatalog(opCtx, ns.db());
+ invariant(viewCatalog);
+ auto viewDef = viewCatalog->lookup(opCtx, ns.ns());
+ invariant(viewDef);
+ const auto& options = *viewDef->timeseries();
+
+ BSONObjBuilder metadata;
+ if (auto metaField = options.getMetaField()) {
+ if (auto elem = doc[*metaField]) {
+ metadata.appendAs(elem, *metaField);
+ } else {
+ metadata.appendNull(*metaField);
+ }
+ }
+ auto key = std::make_pair(ns, BucketMetadata{metadata.obj()});
+
+ auto time = doc[options.getTimeField()].Date();
+ auto setBucketTime = [time = durationCount<Seconds>(time.toDurationSinceEpoch())](
+ OID* bucketId) { bucketId->setTimestamp(time); };
+
+ auto it = _bucketIds.find(key);
+ if (it == _bucketIds.end()) {
+ // A bucket for this namespace and metadata pair does not yet exist.
+ it = _bucketIds.insert({std::move(key), OID::gen()}).first;
+ setBucketTime(&it->second);
+ _orderedBuckets.insert({ns, it->first.second, it->second});
+ }
+
+ _idleBuckets.erase(it->second);
+ auto bucket = &_buckets[it->second];
+
+ StringSet newFieldNamesToBeInserted;
+ uint32_t sizeToBeAdded = 0;
+ for (const auto& elem : doc) {
+ if (options.getMetaField() && elem.fieldNameStringData() == *options.getMetaField()) {
+ // Ignore the metadata field since it will not be inserted.
+ continue;
+ }
+
+ // If the field name is new, add the size of an empty object with that field name.
+ if (!bucket->fieldNames.contains(elem.fieldName())) {
+ newFieldNamesToBeInserted.insert(elem.fieldName());
+ sizeToBeAdded += BSON(elem.fieldName() << BSONObj()).objsize();
+ }
+
+ // Add the element size, taking into account that the name will be changed to its positional
+ // number. Add 1 to the calculation since the element's field name size accounts for a null
+ // terminator whereas the stringified position does not.
+ sizeToBeAdded +=
+ elem.size() - elem.fieldNameSize() + std::to_string(bucket->numMeasurements).size() + 1;
+ }
+
+ auto bucketTime = it->second.asDateT();
+ if (bucket->numMeasurements == kTimeseriesBucketMaxCount ||
+ bucket->size + sizeToBeAdded > kTimeseriesBucketMaxSizeBytes ||
+ time - bucketTime >= kTimeseriesBucketMaxTimeRange || time < bucketTime) {
+ // The bucket is full, so create a new one.
+ bucket->full = true;
+ it->second = OID::gen();
+ setBucketTime(&it->second);
+ _orderedBuckets.insert({ns, it->first.second, it->second});
+ bucket = &_buckets[it->second];
+ }
+
+ bucket->numWriters++;
+ bucket->numMeasurements++;
+ bucket->size += sizeToBeAdded;
+ bucket->measurementsToBeInserted.push_back(doc);
+ bucket->newFieldNamesToBeInserted.merge(newFieldNamesToBeInserted);
+ if (bucket->ns.isEmpty()) {
+ // The namespace and metadata only need to be set if this bucket was newly created.
+ bucket->ns = ns;
+ bucket->metadata = it->first.second;
+ }
+
+ // If there is exactly 1 uncommitted measurement, the caller is the committer. Otherwise, it is
+ // a waiter.
+ boost::optional<Future<CommitInfo>> commitInfoFuture;
+ if (bucket->numMeasurements - bucket->numCommittedMeasurements > 1) {
+ auto [promise, future] = makePromiseFuture<CommitInfo>();
+ bucket->promises[bucket->numMeasurements - 1] = std::move(promise);
+ commitInfoFuture = std::move(future);
+ }
+
+ return {it->second, std::move(commitInfoFuture)};
+}
+
+BucketCatalog::CommitData BucketCatalog::commit(const OID& bucketId,
+ boost::optional<CommitInfo> previousCommitInfo) {
+ stdx::lock_guard lk(_mutex);
+ auto it = _buckets.find(bucketId);
+ auto& bucket = it->second;
+
+ // The only case in which previousCommitInfo should not be provided is the first time a given
+ // committer calls this function.
+ invariant(!previousCommitInfo || bucket.numCommittedMeasurements != 0 ||
+ bucket.numPendingCommitMeasurements != 0);
+
+ bucket.fieldNames.merge(bucket.newFieldNamesToBeInserted);
+ bucket.newFieldNamesToBeInserted.clear();
+
+ std::vector<BSONObj> measurements;
+ bucket.measurementsToBeInserted.swap(measurements);
+
+ // Inform waiters that their measurements have been committed.
+ for (uint32_t i = 0; i < bucket.numPendingCommitMeasurements; i++) {
+ auto it = bucket.promises.find(i + bucket.numCommittedMeasurements);
+ if (it != bucket.promises.end()) {
+ it->second.emplaceValue(*previousCommitInfo);
+ bucket.promises.erase(it);
+ }
+ }
+
+ bucket.numWriters -= bucket.numPendingCommitMeasurements;
+ auto numCommittedMeasurements = bucket.numCommittedMeasurements +=
+ std::exchange(bucket.numPendingCommitMeasurements, measurements.size());
+
+ if (measurements.empty()) {
+ if (bucket.full) {
+ // Everything in the bucket has been committed, and nothing more will be added since the
+ // bucket is full. Thus, we can remove it.
+ _orderedBuckets.erase(
+ {std::move(it->second.ns), std::move(it->second.metadata), bucketId});
+ _buckets.erase(it);
+ } else if (--bucket.numWriters == 0) {
+ _idleBuckets.insert(bucketId);
+ }
+ }
+
+ return {std::move(measurements), numCommittedMeasurements};
+}
+
+void BucketCatalog::clear(const NamespaceString& ns) {
+ stdx::lock_guard lk(_mutex);
+
+ auto shouldClear = [&ns](const NamespaceString& bucketNs) {
+ return ns.coll().empty() ? ns.db() == bucketNs.db() : ns == bucketNs;
+ };
+
+ for (auto it = _orderedBuckets.lower_bound({ns, {}, {}});
+ it != _orderedBuckets.end() && shouldClear(std::get<NamespaceString>(*it));) {
+ auto& bucketId = std::get<OID>(*it);
+ _buckets.erase(bucketId);
+ _idleBuckets.erase(bucketId);
+ _bucketIds.erase({std::get<NamespaceString>(*it), std::get<BucketMetadata>(*it)});
+ it = _orderedBuckets.erase(it);
+ }
+}
+
+void BucketCatalog::clear(StringData dbName) {
+ clear(NamespaceString(dbName, ""));
+}
+
+bool BucketCatalog::BucketMetadata::operator<(const BucketMetadata& other) const {
+ auto size = metadata.objsize();
+ auto otherSize = other.metadata.objsize();
+ auto cmp = std::memcmp(metadata.objdata(), other.metadata.objdata(), std::min(size, otherSize));
+ return cmp == 0 && size != otherSize ? size < otherSize : cmp < 0;
+}
+
+bool BucketCatalog::BucketMetadata::operator==(const BucketMetadata& other) const {
+ return metadata.binaryEqual(other.metadata);
+}
+} // namespace mongo
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
new file mode 100644
index 00000000000..c8aedce23a2
--- /dev/null
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/ops/single_write_result_gen.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/timeseries/timeseries_gen.h"
+#include "mongo/util/string_map.h"
+
+namespace mongo {
+class BucketCatalog {
+public:
+ struct CommitInfo {
+ StatusWith<SingleWriteResult> result;
+ boost::optional<repl::OpTime> opTime;
+ boost::optional<OID> electionId;
+ };
+
+ struct InsertResult {
+ OID bucketId;
+ boost::optional<Future<CommitInfo>> commitInfo;
+ };
+
+ struct CommitData {
+ std::vector<BSONObj> docs;
+ uint16_t numCommittedMeasurements;
+ };
+
+ static BucketCatalog& get(ServiceContext* svcCtx);
+ static BucketCatalog& get(OperationContext* opCtx);
+
+ BucketCatalog() = default;
+
+ BucketCatalog(const BucketCatalog&) = delete;
+ BucketCatalog operator=(const BucketCatalog&) = delete;
+
+ /**
+ * Returns the id of the bucket that the document belongs in, and a Future to wait on if the
+ * caller is a waiter for the bucket. If no Future is provided, the caller is the committer for
+ * this bucket.
+ */
+ InsertResult insert(OperationContext* opCtx, const NamespaceString& ns, const BSONObj& doc);
+
+ /**
+ * Returns the uncommitted measurements and the number of measurements that have already been
+ * committed for the given bucket. This should be called continuously by the committer until
+ * there are no more uncommitted measurements.
+ */
+ CommitData commit(const OID& bucketId,
+ boost::optional<CommitInfo> previousCommitInfo = boost::none);
+
+ /**
+ * Clears the buckets for the given namespace.
+ */
+ void clear(const NamespaceString& ns);
+
+ /**
+ * Clears the buckets for the given database.
+ */
+ void clear(StringData dbName);
+
+private:
+ struct BucketMetadata {
+ bool operator<(const BucketMetadata& other) const;
+ bool operator==(const BucketMetadata& other) const;
+
+ template <typename H>
+ friend H AbslHashValue(H h, const BucketMetadata& metadata) {
+ // TODO (SERVER-52967): Hash the metadata in a way that does not depend on its ordering.
+ SimpleBSONObjComparator::Hasher hasher;
+ return H::combine(std::move(h), hasher(metadata.metadata));
+ }
+
+ BSONObj metadata;
+ };
+
+ struct Bucket {
+ // The namespace that this bucket is used for.
+ NamespaceString ns;
+
+ // The metadata of the data that this bucket contains.
+ BucketMetadata metadata;
+
+ // Measurements to be inserted into the bucket.
+ std::vector<BSONObj> measurementsToBeInserted;
+
+ // New top-level field names of the measurements to be inserted.
+ StringSet newFieldNamesToBeInserted;
+
+ // Top-level field names of the measurements that have been inserted into the bucket.
+ StringSet fieldNames;
+
+ // The total size in bytes of the bucket's BSON serialization, including measurements to be
+ // inserted.
+ uint32_t size = 0;
+
+ // The total number of measurements in the bucket, including uncommitted measurements and
+ // measurements to be inserted.
+ uint16_t numMeasurements = 0;
+
+ // The number of measurements that were most recently returned from a call to commit().
+ uint16_t numPendingCommitMeasurements = 0;
+
+ // The number of committed measurements in the bucket.
+ uint16_t numCommittedMeasurements = 0;
+
+ // The number of current writers for the bucket.
+ uint32_t numWriters = 0;
+
+ // Promises for committers to fulfill in order to signal to waiters that their measurements
+ // have been committed.
+ stdx::unordered_map<uint16_t, Promise<CommitInfo>> promises;
+
+ // Whether the bucket is full. This can be due to number of measurements, size, or time
+ // range.
+ bool full = false;
+ };
+
+ Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog");
+
+ // All buckets currently in the catalog, including buckets which are full but not yet committed.
+ stdx::unordered_map<OID, Bucket, OID::Hasher> _buckets;
+
+ // The _id of the current bucket for each namespace and metadata pair.
+ stdx::unordered_map<std::pair<NamespaceString, BucketMetadata>, OID> _bucketIds;
+
+ // All namespace, metadata, and _id tuples which currently have a bucket in the catalog.
+ std::set<std::tuple<NamespaceString, BucketMetadata, OID>> _orderedBuckets;
+
+ // Buckets that do not have any writers.
+ std::set<OID> _idleBuckets;
+};
+} // namespace mongo
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
new file mode 100644
index 00000000000..469c874f14f
--- /dev/null
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog/catalog_test_fixture.h"
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/timeseries/bucket_catalog.h"
+#include "mongo/db/views/view_catalog.h"
+#include "mongo/unittest/death_test.h"
+
+namespace mongo {
+namespace {
+class BucketCatalogTest : public CatalogTestFixture {
+protected:
+ void setUp() override;
+
+ void _commit(const OID& bucketId, uint16_t numCommittedMeasurements);
+ void _insertOneAndCommit(const NamespaceString& ns, uint16_t numCommittedMeasurements);
+
+ OperationContext* _opCtx;
+ BucketCatalog* _bucketCatalog;
+
+ StringData _timeField = "time";
+ StringData _metaField = "meta";
+
+ NamespaceString _ns1{"bucket_catalog_test_1", "t_1"};
+ NamespaceString _ns2{"bucket_catalog_test_1", "t_2"};
+ NamespaceString _ns3{"bucket_catalog_test_2", "t_1"};
+
+ BucketCatalog::CommitInfo _commitInfo{StatusWith<SingleWriteResult>(SingleWriteResult{})};
+};
+
+void BucketCatalogTest::setUp() {
+ CatalogTestFixture::setUp();
+
+ _opCtx = operationContext();
+ _bucketCatalog = &BucketCatalog::get(_opCtx);
+
+ for (const auto& ns : {_ns1, _ns2, _ns3}) {
+ ASSERT_OK(createCollection(
+ _opCtx,
+ ns.db().toString(),
+ BSON("create" << ns.coll() << "timeseries"
+ << BSON("timeField" << _timeField << "metaField" << _metaField))));
+ }
+}
+
+void BucketCatalogTest::_commit(const OID& bucketId, uint16_t numCommittedMeasurements) {
+ auto data = _bucketCatalog->commit(bucketId);
+ ASSERT_EQ(data.docs.size(), 1);
+ ASSERT_EQ(data.numCommittedMeasurements, numCommittedMeasurements);
+
+ data = _bucketCatalog->commit(bucketId, _commitInfo);
+ ASSERT_EQ(data.docs.size(), 0);
+ ASSERT_EQ(data.numCommittedMeasurements, numCommittedMeasurements + 1);
+}
+
+void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns,
+ uint16_t numCommittedMeasurements) {
+ auto [bucketId, commitInfo] =
+ _bucketCatalog->insert(_opCtx, ns, BSON(_timeField << Date_t::now()));
+ ASSERT(!commitInfo);
+
+ _commit(bucketId, numCommittedMeasurements);
+}
+
+TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
+ // The first insert should be the committer.
+ auto result1 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
+ ASSERT(!result1.commitInfo);
+
+ // A subsequent insert into the same bucket should be a waiter.
+ auto result2 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
+ ASSERT(result2.commitInfo);
+ ASSERT(!result2.commitInfo->isReady());
+
+ // Committing should return both documents since they belong in the same bucket.
+ auto data = _bucketCatalog->commit(result1.bucketId);
+ ASSERT_EQ(data.docs.size(), 2);
+ ASSERT_EQ(data.numCommittedMeasurements, 0);
+ ASSERT(!result2.commitInfo->isReady());
+
+ // Once the commit has occurred, the waiter should be notified.
+ data = _bucketCatalog->commit(result1.bucketId, _commitInfo);
+ ASSERT_EQ(data.docs.size(), 0);
+ ASSERT_EQ(data.numCommittedMeasurements, 2);
+ ASSERT(result2.commitInfo->isReady());
+}
+
+TEST_F(BucketCatalogTest, InsertInfoDifferentBuckets) {
+ // The first insert should be the committer.
+ auto result1 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
+ ASSERT(!result1.commitInfo);
+
+ // Subsequent inserts into different buckets should also be committers.
+ auto result2 = _bucketCatalog->insert(
+ _opCtx, _ns1, BSON(_timeField << Date_t::now() << _metaField << BSONObj()));
+ ASSERT(!result2.commitInfo);
+
+ auto result3 = _bucketCatalog->insert(_opCtx, _ns2, BSON(_timeField << Date_t::now()));
+ ASSERT(!result3.commitInfo);
+
+ // Committing one bucket should only return the one document in that bucket and shoukd not
+ // affect the other bucket.
+ for (const auto& bucketId : {result1.bucketId, result2.bucketId, result3.bucketId}) {
+ _commit(bucketId, 0);
+ }
+}
+
+TEST_F(BucketCatalogTest, NumCommittedMeasurementsAccumulates) {
+ // The numCommittedMeasurements returned when committing should accumulate as more entries in
+ // the bucket are committed.
+ _insertOneAndCommit(_ns1, 0);
+ _insertOneAndCommit(_ns1, 1);
+}
+
+TEST_F(BucketCatalogTest, ClearNamespaceBuckets) {
+ _insertOneAndCommit(_ns1, 0);
+ _insertOneAndCommit(_ns2, 0);
+
+ _bucketCatalog->clear(_ns1);
+
+ _insertOneAndCommit(_ns1, 0);
+ _insertOneAndCommit(_ns2, 1);
+}
+
+TEST_F(BucketCatalogTest, ClearDatabaseBuckets) {
+ _insertOneAndCommit(_ns1, 0);
+ _insertOneAndCommit(_ns2, 0);
+ _insertOneAndCommit(_ns3, 0);
+
+ _bucketCatalog->clear(_ns1.db());
+
+ _insertOneAndCommit(_ns1, 0);
+ _insertOneAndCommit(_ns2, 0);
+ _insertOneAndCommit(_ns3, 1);
+}
+
+DEATH_TEST_F(BucketCatalogTest, CannotProvideCommitInfoOnFirstCommit, "invariant") {
+ auto [bucketId, _] = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
+ _bucketCatalog->commit(bucketId, _commitInfo);
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/timeseries/timeseries.idl b/src/mongo/db/timeseries/timeseries.idl
new file mode 100644
index 00000000000..c2ecaf9227b
--- /dev/null
+++ b/src/mongo/db/timeseries/timeseries.idl
@@ -0,0 +1,54 @@
+# Copyright (C) 2020-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ TimeseriesOptions:
+ description: "The options that define a time-series collection."
+ strict: true
+ fields:
+ timeField:
+ description: "The name of the field to be used for time. Inserted documents must
+ have this field, and the field must be of the BSON UTC datetime type
+ (0x9)"
+ type: string
+ metaField:
+ description: "The name of the field describing the series. This field is used to
+ group related data and may be of any BSON type. This may not be
+ \"_id\" or the same as 'timeField'."
+ type: string
+ optional: true
+ expireAfterSeconds:
+ description: "The number of seconds after which old time-series data should be
+ deleted."
+ type: long
+ optional: true
diff --git a/src/mongo/db/views/durable_view_catalog.cpp b/src/mongo/db/views/durable_view_catalog.cpp
index 1031f87682f..d6770ca7a99 100644
--- a/src/mongo/db/views/durable_view_catalog.cpp
+++ b/src/mongo/db/views/durable_view_catalog.cpp
@@ -142,7 +142,8 @@ BSONObj DurableViewCatalogImpl::_validateViewDefinition(OperationContext* opCtx,
for (const BSONElement& e : viewDefinition) {
std::string name(e.fieldName());
- valid &= name == "_id" || name == "viewOn" || name == "pipeline" || name == "collation";
+ valid &= name == "_id" || name == "viewOn" || name == "pipeline" || name == "collation" ||
+ name == "timeseries";
}
const auto viewName = viewDefinition["_id"].str();
@@ -168,6 +169,9 @@ BSONObj DurableViewCatalogImpl::_validateViewDefinition(OperationContext* opCtx,
valid &= (!viewDefinition.hasField("collation") ||
viewDefinition["collation"].type() == BSONType::Object);
+ valid &= !viewDefinition.hasField("timeseries") ||
+ viewDefinition["timeseries"].type() == BSONType::Object;
+
uassert(ErrorCodes::InvalidViewDefinition,
str::stream() << "found invalid view definition " << viewDefinition["_id"]
<< " while reading '" << _db->getSystemViewsName() << "'",
diff --git a/src/mongo/db/views/view.cpp b/src/mongo/db/views/view.cpp
index e5812dedf19..1f2f6d797bd 100644
--- a/src/mongo/db/views/view.cpp
+++ b/src/mongo/db/views/view.cpp
@@ -41,8 +41,12 @@ ViewDefinition::ViewDefinition(StringData dbName,
StringData viewName,
StringData viewOnName,
const BSONObj& pipeline,
- std::unique_ptr<CollatorInterface> collator)
- : _viewNss(dbName, viewName), _viewOnNss(dbName, viewOnName), _collator(std::move(collator)) {
+ std::unique_ptr<CollatorInterface> collator,
+ const boost::optional<TimeseriesOptions>& timeseries)
+ : _viewNss(dbName, viewName),
+ _viewOnNss(dbName, viewOnName),
+ _collator(std::move(collator)),
+ _timeseries(timeseries) {
for (BSONElement e : pipeline) {
_pipeline.push_back(e.Obj().getOwned());
}
@@ -52,22 +56,19 @@ ViewDefinition::ViewDefinition(const ViewDefinition& other)
: _viewNss(other._viewNss),
_viewOnNss(other._viewOnNss),
_collator(CollatorInterface::cloneCollator(other._collator.get())),
- _pipeline(other._pipeline) {}
+ _pipeline(other._pipeline),
+ _timeseries(other._timeseries) {}
ViewDefinition& ViewDefinition::operator=(const ViewDefinition& other) {
_viewNss = other._viewNss;
_viewOnNss = other._viewOnNss;
_collator = CollatorInterface::cloneCollator(other._collator.get());
_pipeline = other._pipeline;
+ _timeseries = other._timeseries;
return *this;
}
-bool ViewDefinition::isTimeseries() const {
- auto bucketsNs = _viewNss.makeTimeseriesBucketsNamespace();
- return bucketsNs == _viewOnNss;
-}
-
void ViewDefinition::setViewOn(const NamespaceString& viewOnNss) {
invariant(_viewNss.db() == viewOnNss.db());
_viewOnNss = viewOnNss;
diff --git a/src/mongo/db/views/view.h b/src/mongo/db/views/view.h
index 715399ef746..07edd41fddc 100644
--- a/src/mongo/db/views/view.h
+++ b/src/mongo/db/views/view.h
@@ -36,6 +36,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/collation/collator_interface.h"
+#include "mongo/db/timeseries/timeseries_gen.h"
namespace mongo {
@@ -52,7 +53,8 @@ public:
StringData viewName,
StringData viewOnName,
const BSONObj& pipeline,
- std::unique_ptr<CollatorInterface> collation);
+ std::unique_ptr<CollatorInterface> collation,
+ const boost::optional<TimeseriesOptions>& timeseries);
/**
* Copying a view 'other' clones its collator and does a simple copy of all other fields.
@@ -91,9 +93,11 @@ public:
}
/**
- * Returns true if this view represents a time-series collection.
+ * Returns the time-series options for the view, or boost::none if not a time-series view.
*/
- bool isTimeseries() const;
+ const boost::optional<TimeseriesOptions>& timeseries() const {
+ return _timeseries;
+ }
void setViewOn(const NamespaceString& viewOnNss);
@@ -107,5 +111,6 @@ private:
NamespaceString _viewOnNss;
std::unique_ptr<CollatorInterface> _collator;
std::vector<BSONObj> _pipeline;
+ boost::optional<TimeseriesOptions> _timeseries;
};
} // namespace mongo
diff --git a/src/mongo/db/views/view_catalog.cpp b/src/mongo/db/views/view_catalog.cpp
index c606eede46a..a20cec26e77 100644
--- a/src/mongo/db/views/view_catalog.cpp
+++ b/src/mongo/db/views/view_catalog.cpp
@@ -122,11 +122,22 @@ Status ViewCatalog::_reload(WithLock,
}
}
+ boost::optional<TimeseriesOptions> timeseries;
+ if (view.hasField("timeseries")) {
+ try {
+ timeseries =
+ TimeseriesOptions::parse({"ViewCatalog::_reload"}, view["timeseries"].Obj());
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+ }
+
_viewMap[viewName.ns()] = std::make_shared<ViewDefinition>(viewName.db(),
viewName.coll(),
view["viewOn"].str(),
pipeline,
- std::move(collator.getValue()));
+ std::move(collator.getValue()),
+ timeseries);
return Status::OK();
};
@@ -185,7 +196,8 @@ Status ViewCatalog::_createOrUpdateView(WithLock lk,
const NamespaceString& viewName,
const NamespaceString& viewOn,
const BSONArray& pipeline,
- std::unique_ptr<CollatorInterface> collator) {
+ std::unique_ptr<CollatorInterface> collator,
+ const boost::optional<TimeseriesOptions>& timeseries) {
invariant(opCtx->lockState()->isDbLockedForMode(viewName.db(), MODE_IX));
invariant(opCtx->lockState()->isCollectionLockedForMode(viewName, MODE_IX));
invariant(opCtx->lockState()->isCollectionLockedForMode(
@@ -206,10 +218,17 @@ Status ViewCatalog::_createOrUpdateView(WithLock lk,
if (collator) {
viewDefBuilder.append("collation", collator->getSpec().toBSON());
}
+ if (timeseries) {
+ viewDefBuilder.append("timeseries", timeseries->toBSON());
+ }
BSONObj ownedPipeline = pipeline.getOwned();
- auto view = std::make_shared<ViewDefinition>(
- viewName.db(), viewName.coll(), viewOn.coll(), ownedPipeline, std::move(collator));
+ auto view = std::make_shared<ViewDefinition>(viewName.db(),
+ viewName.coll(),
+ viewOn.coll(),
+ ownedPipeline,
+ std::move(collator),
+ timeseries);
// Check that the resulting dependency graph is acyclic and within the maximum depth.
Status graphStatus = _upsertIntoGraph(lk, opCtx, *(view.get()));
@@ -403,7 +422,8 @@ Status ViewCatalog::createView(OperationContext* opCtx,
const NamespaceString& viewName,
const NamespaceString& viewOn,
const BSONArray& pipeline,
- const BSONObj& collation) {
+ const BSONObj& collation,
+ const boost::optional<TimeseriesOptions>& timeseries) {
invariant(opCtx->lockState()->isDbLockedForMode(viewName.db(), MODE_IX));
invariant(opCtx->lockState()->isCollectionLockedForMode(viewName, MODE_IX));
invariant(opCtx->lockState()->isCollectionLockedForMode(
@@ -428,7 +448,7 @@ Status ViewCatalog::createView(OperationContext* opCtx,
return collator.getStatus();
return _createOrUpdateView(
- lk, opCtx, viewName, viewOn, pipeline, std::move(collator.getValue()));
+ lk, opCtx, viewName, viewOn, pipeline, std::move(collator.getValue()), timeseries);
}
Status ViewCatalog::modifyView(OperationContext* opCtx,
diff --git a/src/mongo/db/views/view_catalog.h b/src/mongo/db/views/view_catalog.h
index ecfff9de990..46a6735cf9c 100644
--- a/src/mongo/db/views/view_catalog.h
+++ b/src/mongo/db/views/view_catalog.h
@@ -102,7 +102,8 @@ public:
const NamespaceString& viewName,
const NamespaceString& viewOn,
const BSONArray& pipeline,
- const BSONObj& collation);
+ const BSONObj& collation,
+ const boost::optional<TimeseriesOptions>& timeseries);
/**
* Drop the view named 'viewName'.
@@ -168,7 +169,8 @@ private:
const NamespaceString& viewName,
const NamespaceString& viewOn,
const BSONArray& pipeline,
- std::unique_ptr<CollatorInterface> collator);
+ std::unique_ptr<CollatorInterface> collator,
+ const boost::optional<TimeseriesOptions>& timeseries = boost::none);
/**
* Parses the view definition pipeline, attempts to upsert into the view graph, and refreshes
* the graph if necessary. Returns an error status if the resulting graph would be invalid.
diff --git a/src/mongo/db/views/view_catalog_test.cpp b/src/mongo/db/views/view_catalog_test.cpp
index c2d7367db3e..30d1b396444 100644
--- a/src/mongo/db/views/view_catalog_test.cpp
+++ b/src/mongo/db/views/view_catalog_test.cpp
@@ -123,7 +123,8 @@ public:
MODE_X);
WriteUnitOfWork wuow(opCtx);
- Status s = _viewCatalog->createView(opCtx, viewName, viewOn, pipeline, collation);
+ Status s =
+ _viewCatalog->createView(opCtx, viewName, viewOn, pipeline, collation, boost::none);
wuow.commit();
return s;
@@ -530,7 +531,7 @@ TEST_F(ViewCatalogFixture, LookupRIDExistingViewRollback) {
WriteUnitOfWork wunit(operationContext());
ASSERT_OK(getViewCatalog()->createView(
- operationContext(), viewName, viewOn, emptyPipeline, emptyCollation));
+ operationContext(), viewName, viewOn, emptyPipeline, emptyCollation, boost::none));
}
auto resourceID = ResourceId(RESOURCE_COLLECTION, "db.view"_sd);
auto collectionCatalog = CollectionCatalog::get(operationContext());
diff --git a/src/mongo/db/views/view_definition_test.cpp b/src/mongo/db/views/view_definition_test.cpp
index 6b2aa57cba9..7ea401caee4 100644
--- a/src/mongo/db/views/view_definition_test.cpp
+++ b/src/mongo/db/views/view_definition_test.cpp
@@ -47,10 +47,11 @@ namespace {
const NamespaceString viewNss("testdb.testview");
const NamespaceString backingNss("testdb.testcoll");
const BSONObj samplePipeline = BSON_ARRAY(BSON("limit" << 9));
+const TimeseriesOptions timeseries("time");
TEST(ViewDefinitionTest, ViewDefinitionCreationCorrectlyBuildsNamespaceStrings) {
ViewDefinition viewDef(
- viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr);
+ viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, boost::none);
ASSERT_EQ(viewDef.name(), viewNss);
ASSERT_EQ(viewDef.viewOn(), backingNss);
}
@@ -58,8 +59,12 @@ TEST(ViewDefinitionTest, ViewDefinitionCreationCorrectlyBuildsNamespaceStrings)
TEST(ViewDefinitionTest, CopyConstructorProperlyClonesAllFields) {
auto collator =
std::make_unique<CollatorInterfaceMock>(CollatorInterfaceMock::MockType::kReverseString);
- ViewDefinition originalView(
- viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, std::move(collator));
+ ViewDefinition originalView(viewNss.db(),
+ viewNss.coll(),
+ backingNss.coll(),
+ samplePipeline,
+ std::move(collator),
+ timeseries);
ViewDefinition copiedView(originalView);
ASSERT_EQ(originalView.name(), copiedView.name());
@@ -70,13 +75,18 @@ TEST(ViewDefinitionTest, CopyConstructorProperlyClonesAllFields) {
SimpleBSONObjComparator::kInstance.makeEqualTo()));
ASSERT(CollatorInterface::collatorsMatch(originalView.defaultCollator(),
copiedView.defaultCollator()));
+ ASSERT(originalView.timeseries()->toBSON().binaryEqual(copiedView.timeseries()->toBSON()));
}
TEST(ViewDefinitionTest, CopyAssignmentOperatorProperlyClonesAllFields) {
auto collator =
std::make_unique<CollatorInterfaceMock>(CollatorInterfaceMock::MockType::kReverseString);
- ViewDefinition originalView(
- viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, std::move(collator));
+ ViewDefinition originalView(viewNss.db(),
+ viewNss.coll(),
+ backingNss.coll(),
+ samplePipeline,
+ std::move(collator),
+ timeseries);
ViewDefinition copiedView = originalView;
ASSERT_EQ(originalView.name(), copiedView.name());
@@ -87,20 +97,21 @@ TEST(ViewDefinitionTest, CopyAssignmentOperatorProperlyClonesAllFields) {
SimpleBSONObjComparator::kInstance.makeEqualTo()));
ASSERT(CollatorInterface::collatorsMatch(originalView.defaultCollator(),
copiedView.defaultCollator()));
+ ASSERT(originalView.timeseries()->toBSON().binaryEqual(copiedView.timeseries()->toBSON()));
}
DEATH_TEST_REGEX(ViewDefinitionTest,
SetViewOnFailsIfNewViewOnNotInSameDatabaseAsView,
R"#(Invariant failure.*_viewNss.db\(\) == viewOnNss.db\(\))#") {
ViewDefinition viewDef(
- viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr);
+ viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, boost::none);
NamespaceString badViewOn("someOtherDb.someOtherCollection");
viewDef.setViewOn(badViewOn);
}
TEST(ViewDefinitionTest, SetViewOnSucceedsIfNewViewOnIsInSameDatabaseAsView) {
ViewDefinition viewDef(
- viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr);
+ viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, boost::none);
ASSERT_EQ(viewDef.viewOn(), backingNss);
NamespaceString newViewOn("testdb.othercollection");
@@ -112,7 +123,7 @@ DEATH_TEST_REGEX(ViewDefinitionTest,
SetPiplineFailsIfPipelineTypeIsNotArray,
R"#(Invariant failure.*pipeline.type\(\) == Array)#") {
ViewDefinition viewDef(
- viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr);
+ viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, boost::none);
// We'll pass in a BSONElement that could be a valid array, but is BSONType::Object rather than
// BSONType::Array.
@@ -127,7 +138,8 @@ DEATH_TEST_REGEX(ViewDefinitionTest,
}
TEST(ViewDefinitionTest, SetPipelineSucceedsOnValidArrayBSONElement) {
- ViewDefinition viewDef(viewNss.db(), viewNss.coll(), backingNss.coll(), BSONObj(), nullptr);
+ ViewDefinition viewDef(
+ viewNss.db(), viewNss.coll(), backingNss.coll(), BSONObj(), nullptr, boost::none);
ASSERT(viewDef.pipeline().empty());
BSONObj matchStage = BSON("match" << BSON("x" << 9));
@@ -142,5 +154,12 @@ TEST(ViewDefinitionTest, SetPipelineSucceedsOnValidArrayBSONElement) {
viewDef.pipeline().begin(),
SimpleBSONObjComparator::kInstance.makeEqualTo()));
}
+
+TEST(ViewDefinitionTest, ViewDefinitionCreationCorrectlySetsTimeseries) {
+ ViewDefinition viewDef(
+ viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, timeseries);
+ ASSERT(viewDef.timeseries());
+ ASSERT_EQ(viewDef.timeseries()->getTimeField(), "time");
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/views/view_graph_test.cpp b/src/mongo/db/views/view_graph_test.cpp
index 5ace80ef15a..0dbdab653d4 100644
--- a/src/mongo/db/views/view_graph_test.cpp
+++ b/src/mongo/db/views/view_graph_test.cpp
@@ -83,7 +83,7 @@ public:
collator = std::move(factoryCollator.getValue());
}
- return {db, view, viewOn, pipeline, std::move(collator)};
+ return {db, view, viewOn, pipeline, std::move(collator), boost::none};
}
private: