summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2021-01-29 21:38:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-08 18:50:57 +0000
commit8585aa1509d6b3cf8f1b531f2370729c557d2422 (patch)
tree837c74b53ca41973277614da5f121a3e362d8187
parent0963c63a150d99713efbc5ff5221dc34c56718c8 (diff)
downloadmongo-8585aa1509d6b3cf8f1b531f2370729c557d2422.tar.gz
SERVER-54269 Time-series bucket catalog should reject inserts with missing/invalid time field
-rw-r--r--jstests/noPassthrough/timeseries_insert_invalid_timefield.js60
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp32
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp17
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h4
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp65
5 files changed, 134 insertions, 44 deletions
diff --git a/jstests/noPassthrough/timeseries_insert_invalid_timefield.js b/jstests/noPassthrough/timeseries_insert_invalid_timefield.js
new file mode 100644
index 00000000000..1a87a1c1ad1
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_insert_invalid_timefield.js
@@ -0,0 +1,60 @@
+/**
+ * Tests that a time-series collection rejects documents with invalid timeField values
+ */
+(function() {
+"use strict";
+
+load("jstests/core/timeseries/libs/timeseries.js");
+
+const conn = MongoRunner.runMongod();
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) {
+ jsTestLog("Skipping test because the time-series collection feature flag is disabled");
+ MongoRunner.stopMongod(conn);
+ return;
+}
+
+const dbName = jsTestName();
+const testDB = conn.getDB(dbName);
+assert.commandWorked(testDB.dropDatabase());
+
+const coll = testDB.getCollection('t');
+const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
+coll.drop();
+
+const timeFieldName = 'time';
+const metaFieldName = 'meta';
+
+assert.commandWorked(testDB.createCollection(
+ coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}));
+assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
+
+// first test a good doc just in case
+const goodDocs = [
+ {
+ _id: 0,
+ time: ISODate("2020-11-26T00:00:00.000Z"),
+ meta: "A",
+ data: true,
+ },
+ {
+ _id: 1,
+ time: ISODate("2020-11-27T00:00:00.000Z"),
+ meta: "A",
+ data: true,
+ }
+];
+assert.commandWorked(coll.insert(goodDocs[0]));
+assert.eq(1, coll.count());
+assert.docEq(coll.find().toArray(), [goodDocs[0]]);
+
+// now make sure we reject if timeField is missing or isn't a valid BSON datetime
+let mixedDocs = [{meta: "B", data: true}, goodDocs[1], {time: "invalid", meta: "B", data: false}];
+assert.commandFailedWithCode(coll.insert(mixedDocs, {ordered: false}), ErrorCodes.BadValue);
+assert.eq(coll.count(), 2);
+assert.docEq(coll.find().toArray(), goodDocs);
+assert.eq(null, coll.findOne({meta: mixedDocs[0].meta}));
+assert.eq(null, coll.findOne({meta: mixedDocs[2].meta}));
+
+MongoRunner.stopMongod(conn);
+})();
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index c4df53257c8..6a704cad9e5 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -261,8 +261,9 @@ bool checkFailTimeseriesInsertFailPoint(const BSONObj& metadata) {
return shouldFailInsert;
}
+template <typename T>
boost::optional<BSONObj> generateError(OperationContext* opCtx,
- const StatusWith<SingleWriteResult>& result,
+ const StatusWith<T>& result,
int index,
size_t numErrors) {
auto status = result.getStatus();
@@ -285,7 +286,7 @@ boost::optional<BSONObj> generateError(OperationContext* opCtx,
BSONSizeTracker errorsSizeTracker;
BSONObjBuilder error(errorsSizeTracker);
error.append("index", index);
- if (auto staleInfo = status.extraInfo<StaleConfigInfo>()) {
+ if (auto staleInfo = status.template extraInfo<StaleConfigInfo>()) {
error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception!
{
BSONObjBuilder errInfo(error.subobjStart("errInfo"));
@@ -293,12 +294,12 @@ boost::optional<BSONObj> generateError(OperationContext* opCtx,
}
} else if (ErrorCodes::DocumentValidationFailure == status.code() && status.extraInfo()) {
auto docValidationError =
- status.extraInfo<doc_validation_error::DocumentValidationFailureInfo>();
+ status.template extraInfo<doc_validation_error::DocumentValidationFailureInfo>();
error.append("code", static_cast<int>(ErrorCodes::DocumentValidationFailure));
error.append("errInfo", docValidationError->getDetails());
} else if (ErrorCodes::isTenantMigrationError(status.code())) {
if (ErrorCodes::TenantMigrationConflict == status.code()) {
- auto migrationConflictInfo = status.extraInfo<TenantMigrationConflictInfo>();
+ auto migrationConflictInfo = status.template extraInfo<TenantMigrationConflictInfo>();
hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx);
@@ -314,13 +315,15 @@ boost::optional<BSONObj> generateError(OperationContext* opCtx,
error.append("errmsg", errorMessage(migrationStatus.reason()));
}
if (migrationStatus.extraInfo()) {
- error.append("errInfo",
- migrationStatus.extraInfo<TenantMigrationCommittedInfo>()->toBSON());
+ error.append(
+ "errInfo",
+ migrationStatus.template extraInfo<TenantMigrationCommittedInfo>()->toBSON());
}
} else {
error.append("code", int(status.code()));
if (status.extraInfo()) {
- error.append("errInfo", status.extraInfo<TenantMigrationCommittedInfo>()->toBSON());
+ error.append("errInfo",
+ status.template extraInfo<TenantMigrationCommittedInfo>()->toBSON());
}
}
} else {
@@ -735,12 +738,17 @@ public:
std::vector<std::pair<BucketCatalog::BucketId, size_t>> bucketsToCommit;
std::vector<std::pair<Future<BucketCatalog::CommitInfo>, size_t>> bucketsToWaitOn;
auto insert = [&](size_t index) {
- auto [bucketId, commitInfo] =
- bucketCatalog.insert(opCtx, ns(), _batch.getDocuments()[index]);
- if (commitInfo) {
- bucketsToWaitOn.push_back({std::move(*commitInfo), index});
+ auto result =
+ bucketCatalog.insert(opCtx, ns(), _batch.getDocuments()[start + index]);
+ if (auto error = generateError(opCtx, result, index, errors->size())) {
+ errors->push_back(*error);
} else {
- bucketsToCommit.push_back({std::move(bucketId), index});
+ auto& [bucketId, commitInfo] = result.getValue();
+ if (commitInfo) {
+ bucketsToWaitOn.push_back({std::move(*commitInfo), index});
+ } else {
+ bucketsToCommit.push_back({std::move(bucketId), index});
+ }
}
};
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 56cf21280ff..b8a70923bb8 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -76,9 +76,9 @@ BSONObj BucketCatalog::getMetadata(const BucketId& bucketId) const {
return bucket.metadata.metadata;
}
-BucketCatalog::InsertResult BucketCatalog::insert(OperationContext* opCtx,
- const NamespaceString& ns,
- const BSONObj& doc) {
+StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const BSONObj& doc) {
stdx::lock_guard lk(_mutex);
auto viewCatalog = DatabaseHolder::get(opCtx)->getViewCatalog(opCtx, ns.db());
@@ -99,7 +99,14 @@ BucketCatalog::InsertResult BucketCatalog::insert(OperationContext* opCtx,
auto& stats = _executionStats[ns];
- auto time = doc[options.getTimeField()].Date();
+ auto timeElem = doc[options.getTimeField()];
+ if (!timeElem || BSONType::Date != timeElem.type()) {
+ return {ErrorCodes::BadValue,
+ str::stream() << "'" << options.getTimeField() << "' must be present an contain a "
+ << "valid BSON UTC datetime value"};
+ }
+
+ auto time = timeElem.Date();
auto createNewBucketId = [&] {
_expireIdleBuckets(&stats);
return BucketIdInternal{time, ++_bucketNum};
@@ -211,7 +218,7 @@ BucketCatalog::InsertResult BucketCatalog::insert(OperationContext* opCtx,
newFieldNamesSize + bucket->min.getMemoryUsage() + bucket->max.getMemoryUsage();
_memoryUsage += bucket->memoryUsage;
- return {it->second, std::move(commitInfoFuture)};
+ return {InsertResult{it->second, std::move(commitInfoFuture)}};
}
BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId,
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 46cf204d0a8..8766711a37b 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -111,7 +111,9 @@ public:
* caller is a waiter for the bucket. If no Future is provided, the caller is the committer for
* this bucket.
*/
- InsertResult insert(OperationContext* opCtx, const NamespaceString& ns, const BSONObj& doc);
+ StatusWith<InsertResult> insert(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const BSONObj& doc);
/**
* Returns the uncommitted measurements and the number of measurements that have already been
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index d1c459c6141..ad60654f675 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -99,8 +99,8 @@ void BucketCatalogTest::_commit(const BucketCatalog::BucketId& bucketId,
void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns,
uint16_t numCommittedMeasurements) {
- auto [bucketId, commitInfo] =
- _bucketCatalog->insert(_opCtx, ns, BSON(_timeField << Date_t::now()));
+ auto result = _bucketCatalog->insert(_opCtx, ns, BSON(_timeField << Date_t::now()));
+ auto& [bucketId, commitInfo] = result.getValue();
ASSERT(!commitInfo);
_commit(bucketId, numCommittedMeasurements);
@@ -109,29 +109,29 @@ void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns,
TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
// The first insert should be the committer.
auto result1 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
- ASSERT(!result1.commitInfo);
+ ASSERT(!result1.getValue().commitInfo);
// A subsequent insert into the same bucket should be a waiter.
auto result2 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
- ASSERT(result2.commitInfo);
- ASSERT(!result2.commitInfo->isReady());
+ ASSERT(result2.getValue().commitInfo);
+ ASSERT(!result2.getValue().commitInfo->isReady());
// Committing should return both documents since they belong in the same bucket.
- auto data = _bucketCatalog->commit(result1.bucketId);
+ auto data = _bucketCatalog->commit(result1.getValue().bucketId);
ASSERT_EQ(data.docs.size(), 2);
ASSERT_EQ(data.numCommittedMeasurements, 0);
- ASSERT(!result2.commitInfo->isReady());
+ ASSERT(!result2.getValue().commitInfo->isReady());
// Once the commit has occurred, the waiter should be notified.
- data = _bucketCatalog->commit(result1.bucketId, _commitInfo);
+ data = _bucketCatalog->commit(result1.getValue().bucketId, _commitInfo);
ASSERT_EQ(data.docs.size(), 0);
ASSERT_EQ(data.numCommittedMeasurements, 2);
- ASSERT(result2.commitInfo->isReady());
+ ASSERT(result2.getValue().commitInfo->isReady());
}
TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) {
auto bucketId =
- _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())).bucketId;
+ _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())).getValue().bucketId;
_bucketCatalog->clear(bucketId);
ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucketId));
}
@@ -140,24 +140,28 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) {
// The first insert should be the committer.
auto result1 = _bucketCatalog->insert(
_opCtx, _ns1, BSON(_timeField << Date_t::now() << _metaField << "123"));
- ASSERT(!result1.commitInfo);
+ ASSERT(!result1.getValue().commitInfo);
// Subsequent inserts into different buckets should also be committers.
auto result2 = _bucketCatalog->insert(
_opCtx, _ns1, BSON(_timeField << Date_t::now() << _metaField << BSONObj()));
- ASSERT(!result2.commitInfo);
+ ASSERT(!result2.getValue().commitInfo);
auto result3 = _bucketCatalog->insert(_opCtx, _ns2, BSON(_timeField << Date_t::now()));
- ASSERT(!result3.commitInfo);
+ ASSERT(!result3.getValue().commitInfo);
// Check metadata in buckets.
- ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"), _bucketCatalog->getMetadata(result1.bucketId));
- ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()), _bucketCatalog->getMetadata(result2.bucketId));
- ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL), _bucketCatalog->getMetadata(result3.bucketId));
+ ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"),
+ _bucketCatalog->getMetadata(result1.getValue().bucketId));
+ ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()),
+ _bucketCatalog->getMetadata(result2.getValue().bucketId));
+ ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL),
+ _bucketCatalog->getMetadata(result3.getValue().bucketId));
// Committing one bucket should only return the one document in that bucket and shoukd not
// affect the other bucket.
- for (const auto& bucketId : {result1.bucketId, result2.bucketId, result3.bucketId}) {
+ for (const auto& bucketId :
+ {result1.getValue().bucketId, result2.getValue().bucketId, result3.getValue().bucketId}) {
_commit(bucketId, 0);
}
}
@@ -192,50 +196,59 @@ TEST_F(BucketCatalogTest, ClearDatabaseBuckets) {
}
DEATH_TEST_F(BucketCatalogTest, CannotProvideCommitInfoOnFirstCommit, "invariant") {
- auto [bucketId, _] = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
+ auto result = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
+ auto& [bucketId, _] = result.getValue();
_bucketCatalog->commit(bucketId, _commitInfo);
}
TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) {
auto result = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
- ASSERT(!result.commitInfo);
+ ASSERT(!result.getValue().commitInfo);
- ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(result.bucketId));
+ ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(result.getValue().bucketId));
- _commit(result.bucketId, 0);
+ _commit(result.getValue().bucketId, 0);
}
TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
// Creating a new bucket should return all fields from the initial measurement.
- auto [bucketId, _] =
+ auto result =
_bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 0));
+ auto& [bucketId, _] = result.getValue();
auto data = _bucketCatalog->commit(bucketId);
ASSERT_EQ(2U, data.newFieldNamesToBeInserted.size()) << data.toBSON();
ASSERT(data.newFieldNamesToBeInserted.count(_timeField)) << data.toBSON();
ASSERT(data.newFieldNamesToBeInserted.count("a")) << data.toBSON();
// Inserting a new measurement with the same fields should return an empty set of new fields.
- _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 1));
+
+ ASSERT_OK(_bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 1))
+ .getStatus());
data = _bucketCatalog->commit(bucketId, _commitInfo);
ASSERT_EQ(0U, data.newFieldNamesToBeInserted.size()) << data.toBSON();
// Insert a new measurement with the a new field.
- _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2));
+ ASSERT_OK(_bucketCatalog
+ ->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2))
+ .getStatus());
data = _bucketCatalog->commit(bucketId, _commitInfo);
ASSERT_EQ(1U, data.newFieldNamesToBeInserted.size()) << data.toBSON();
ASSERT(data.newFieldNamesToBeInserted.count("b")) << data.toBSON();
// Fill up the bucket.
for (auto i = 3; i < gTimeseriesBucketMaxCount; ++i) {
- _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << i));
+ ASSERT_OK(
+ _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << i))
+ .getStatus());
data = _bucketCatalog->commit(bucketId, _commitInfo);
ASSERT_EQ(0U, data.newFieldNamesToBeInserted.size()) << i << ":" << data.toBSON();
}
// When a bucket overflows, committing to the new overflow bucket should return the fields of
// the first measurement as new fields.
- auto [overflowBucketId, unusedCommitInfo] = _bucketCatalog->insert(
+ auto result2 = _bucketCatalog->insert(
_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount));
+ auto& [overflowBucketId, unusedCommitInfo] = result2.getValue();
ASSERT_NE(*bucketId, *overflowBucketId);
data = _bucketCatalog->commit(overflowBucketId);
ASSERT_EQ(2U, data.newFieldNamesToBeInserted.size()) << data.toBSON();