diff options
-rw-r--r-- | jstests/noPassthrough/timeseries_insert_invalid_timefield.js | 60 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/write_commands.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 4 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 65 |
5 files changed, 134 insertions, 44 deletions
diff --git a/jstests/noPassthrough/timeseries_insert_invalid_timefield.js b/jstests/noPassthrough/timeseries_insert_invalid_timefield.js new file mode 100644 index 00000000000..1a87a1c1ad1 --- /dev/null +++ b/jstests/noPassthrough/timeseries_insert_invalid_timefield.js @@ -0,0 +1,60 @@ +/** + * Tests that a time-series collection rejects documents with invalid timeField values + */ +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries.js"); + +const conn = MongoRunner.runMongod(); + +if (!TimeseriesTest.timeseriesCollectionsEnabled(conn)) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + MongoRunner.stopMongod(conn); + return; +} + +const dbName = jsTestName(); +const testDB = conn.getDB(dbName); +assert.commandWorked(testDB.dropDatabase()); + +const coll = testDB.getCollection('t'); +const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName()); +coll.drop(); + +const timeFieldName = 'time'; +const metaFieldName = 'meta'; + +assert.commandWorked(testDB.createCollection( + coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}})); +assert.contains(bucketsColl.getName(), testDB.getCollectionNames()); + +// first test a good doc just in case +const goodDocs = [ + { + _id: 0, + time: ISODate("2020-11-26T00:00:00.000Z"), + meta: "A", + data: true, + }, + { + _id: 1, + time: ISODate("2020-11-27T00:00:00.000Z"), + meta: "A", + data: true, + } +]; +assert.commandWorked(coll.insert(goodDocs[0])); +assert.eq(1, coll.count()); +assert.docEq(coll.find().toArray(), [goodDocs[0]]); + +// now make sure we reject if timeField is missing or isn't a valid BSON datetime +let mixedDocs = [{meta: "B", data: true}, goodDocs[1], {time: "invalid", meta: "B", data: false}]; +assert.commandFailedWithCode(coll.insert(mixedDocs, {ordered: false}), ErrorCodes.BadValue); +assert.eq(coll.count(), 2); +assert.docEq(coll.find().toArray(), goodDocs); +assert.eq(null, coll.findOne({meta: mixedDocs[0].meta})); +assert.eq(null, coll.findOne({meta: mixedDocs[2].meta})); + +MongoRunner.stopMongod(conn); +})(); diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index c4df53257c8..6a704cad9e5 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -261,8 +261,9 @@ bool checkFailTimeseriesInsertFailPoint(const BSONObj& metadata) { return shouldFailInsert; } +template <typename T> boost::optional<BSONObj> generateError(OperationContext* opCtx, - const StatusWith<SingleWriteResult>& result, + const StatusWith<T>& result, int index, size_t numErrors) { auto status = result.getStatus(); @@ -285,7 +286,7 @@ boost::optional<BSONObj> generateError(OperationContext* opCtx, BSONSizeTracker errorsSizeTracker; BSONObjBuilder error(errorsSizeTracker); error.append("index", index); - if (auto staleInfo = status.extraInfo<StaleConfigInfo>()) { + if (auto staleInfo = status.template extraInfo<StaleConfigInfo>()) { error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception! { BSONObjBuilder errInfo(error.subobjStart("errInfo")); @@ -293,12 +294,12 @@ boost::optional<BSONObj> generateError(OperationContext* opCtx, } } else if (ErrorCodes::DocumentValidationFailure == status.code() && status.extraInfo()) { auto docValidationError = - status.extraInfo<doc_validation_error::DocumentValidationFailureInfo>(); + status.template extraInfo<doc_validation_error::DocumentValidationFailureInfo>(); error.append("code", static_cast<int>(ErrorCodes::DocumentValidationFailure)); error.append("errInfo", docValidationError->getDetails()); } else if (ErrorCodes::isTenantMigrationError(status.code())) { if (ErrorCodes::TenantMigrationConflict == status.code()) { - auto migrationConflictInfo = status.extraInfo<TenantMigrationConflictInfo>(); + auto migrationConflictInfo = status.template extraInfo<TenantMigrationConflictInfo>(); hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx); @@ -314,13 +315,15 @@ boost::optional<BSONObj> generateError(OperationContext* opCtx, error.append("errmsg", errorMessage(migrationStatus.reason())); } if (migrationStatus.extraInfo()) { - error.append("errInfo", - migrationStatus.extraInfo<TenantMigrationCommittedInfo>()->toBSON()); + error.append( + "errInfo", + migrationStatus.template extraInfo<TenantMigrationCommittedInfo>()->toBSON()); } } else { error.append("code", int(status.code())); if (status.extraInfo()) { - error.append("errInfo", status.extraInfo<TenantMigrationCommittedInfo>()->toBSON()); + error.append("errInfo", + status.template extraInfo<TenantMigrationCommittedInfo>()->toBSON()); } } } else { @@ -735,12 +738,17 @@ public: std::vector<std::pair<BucketCatalog::BucketId, size_t>> bucketsToCommit; std::vector<std::pair<Future<BucketCatalog::CommitInfo>, size_t>> bucketsToWaitOn; auto insert = [&](size_t index) { - auto [bucketId, commitInfo] = - bucketCatalog.insert(opCtx, ns(), _batch.getDocuments()[index]); - if (commitInfo) { - bucketsToWaitOn.push_back({std::move(*commitInfo), index}); + auto result = + bucketCatalog.insert(opCtx, ns(), _batch.getDocuments()[start + index]); + if (auto error = generateError(opCtx, result, index, errors->size())) { + errors->push_back(*error); } else { - bucketsToCommit.push_back({std::move(bucketId), index}); + auto& [bucketId, commitInfo] = result.getValue(); + if (commitInfo) { + bucketsToWaitOn.push_back({std::move(*commitInfo), index}); + } else { + bucketsToCommit.push_back({std::move(bucketId), index}); + } } }; diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 56cf21280ff..b8a70923bb8 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -76,9 +76,9 @@ BSONObj BucketCatalog::getMetadata(const BucketId& bucketId) const { return bucket.metadata.metadata; } -BucketCatalog::InsertResult BucketCatalog::insert(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& doc) { +StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* opCtx, + const NamespaceString& ns, + const BSONObj& doc) { stdx::lock_guard lk(_mutex); auto viewCatalog = DatabaseHolder::get(opCtx)->getViewCatalog(opCtx, ns.db()); @@ -99,7 +99,14 @@ BucketCatalog::InsertResult BucketCatalog::insert(OperationContext* opCtx, auto& stats = _executionStats[ns]; - auto time = doc[options.getTimeField()].Date(); + auto timeElem = doc[options.getTimeField()]; + if (!timeElem || BSONType::Date != timeElem.type()) { + return {ErrorCodes::BadValue, + str::stream() << "'" << options.getTimeField() << "' must be present an contain a " + << "valid BSON UTC datetime value"}; + } + + auto time = timeElem.Date(); auto createNewBucketId = [&] { _expireIdleBuckets(&stats); return BucketIdInternal{time, ++_bucketNum}; @@ -211,7 +218,7 @@ BucketCatalog::InsertResult BucketCatalog::insert(OperationContext* opCtx, newFieldNamesSize + bucket->min.getMemoryUsage() + bucket->max.getMemoryUsage(); _memoryUsage += bucket->memoryUsage; - return {it->second, std::move(commitInfoFuture)}; + return {InsertResult{it->second, std::move(commitInfoFuture)}}; } BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId, diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 46cf204d0a8..8766711a37b 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -111,7 +111,9 @@ public: * caller is a waiter for the bucket. If no Future is provided, the caller is the committer for * this bucket. */ - InsertResult insert(OperationContext* opCtx, const NamespaceString& ns, const BSONObj& doc); + StatusWith<InsertResult> insert(OperationContext* opCtx, + const NamespaceString& ns, + const BSONObj& doc); /** * Returns the uncommitted measurements and the number of measurements that have already been diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index d1c459c6141..ad60654f675 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -99,8 +99,8 @@ void BucketCatalogTest::_commit(const BucketCatalog::BucketId& bucketId, void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns, uint16_t numCommittedMeasurements) { - auto [bucketId, commitInfo] = - _bucketCatalog->insert(_opCtx, ns, BSON(_timeField << Date_t::now())); + auto result = _bucketCatalog->insert(_opCtx, ns, BSON(_timeField << Date_t::now())); + auto& [bucketId, commitInfo] = result.getValue(); ASSERT(!commitInfo); _commit(bucketId, numCommittedMeasurements); @@ -109,29 +109,29 @@ void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns, TEST_F(BucketCatalogTest, InsertIntoSameBucket) { // The first insert should be the committer. auto result1 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); - ASSERT(!result1.commitInfo); + ASSERT(!result1.getValue().commitInfo); // A subsequent insert into the same bucket should be a waiter. auto result2 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); - ASSERT(result2.commitInfo); - ASSERT(!result2.commitInfo->isReady()); + ASSERT(result2.getValue().commitInfo); + ASSERT(!result2.getValue().commitInfo->isReady()); // Committing should return both documents since they belong in the same bucket. - auto data = _bucketCatalog->commit(result1.bucketId); + auto data = _bucketCatalog->commit(result1.getValue().bucketId); ASSERT_EQ(data.docs.size(), 2); ASSERT_EQ(data.numCommittedMeasurements, 0); - ASSERT(!result2.commitInfo->isReady()); + ASSERT(!result2.getValue().commitInfo->isReady()); // Once the commit has occurred, the waiter should be notified. - data = _bucketCatalog->commit(result1.bucketId, _commitInfo); + data = _bucketCatalog->commit(result1.getValue().bucketId, _commitInfo); ASSERT_EQ(data.docs.size(), 0); ASSERT_EQ(data.numCommittedMeasurements, 2); - ASSERT(result2.commitInfo->isReady()); + ASSERT(result2.getValue().commitInfo->isReady()); } TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) { auto bucketId = - _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())).bucketId; + _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())).getValue().bucketId; _bucketCatalog->clear(bucketId); ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucketId)); } @@ -140,24 +140,28 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { // The first insert should be the committer. auto result1 = _bucketCatalog->insert( _opCtx, _ns1, BSON(_timeField << Date_t::now() << _metaField << "123")); - ASSERT(!result1.commitInfo); + ASSERT(!result1.getValue().commitInfo); // Subsequent inserts into different buckets should also be committers. auto result2 = _bucketCatalog->insert( _opCtx, _ns1, BSON(_timeField << Date_t::now() << _metaField << BSONObj())); - ASSERT(!result2.commitInfo); + ASSERT(!result2.getValue().commitInfo); auto result3 = _bucketCatalog->insert(_opCtx, _ns2, BSON(_timeField << Date_t::now())); - ASSERT(!result3.commitInfo); + ASSERT(!result3.getValue().commitInfo); // Check metadata in buckets. - ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"), _bucketCatalog->getMetadata(result1.bucketId)); - ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()), _bucketCatalog->getMetadata(result2.bucketId)); - ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL), _bucketCatalog->getMetadata(result3.bucketId)); + ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"), + _bucketCatalog->getMetadata(result1.getValue().bucketId)); + ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()), + _bucketCatalog->getMetadata(result2.getValue().bucketId)); + ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL), + _bucketCatalog->getMetadata(result3.getValue().bucketId)); // Committing one bucket should only return the one document in that bucket and shoukd not // affect the other bucket. - for (const auto& bucketId : {result1.bucketId, result2.bucketId, result3.bucketId}) { + for (const auto& bucketId : + {result1.getValue().bucketId, result2.getValue().bucketId, result3.getValue().bucketId}) { _commit(bucketId, 0); } } @@ -192,50 +196,59 @@ TEST_F(BucketCatalogTest, ClearDatabaseBuckets) { } DEATH_TEST_F(BucketCatalogTest, CannotProvideCommitInfoOnFirstCommit, "invariant") { - auto [bucketId, _] = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); + auto result = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); + auto& [bucketId, _] = result.getValue(); _bucketCatalog->commit(bucketId, _commitInfo); } TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) { auto result = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); - ASSERT(!result.commitInfo); + ASSERT(!result.getValue().commitInfo); - ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(result.bucketId)); + ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(result.getValue().bucketId)); - _commit(result.bucketId, 0); + _commit(result.getValue().bucketId, 0); } TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { // Creating a new bucket should return all fields from the initial measurement. - auto [bucketId, _] = + auto result = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 0)); + auto& [bucketId, _] = result.getValue(); auto data = _bucketCatalog->commit(bucketId); ASSERT_EQ(2U, data.newFieldNamesToBeInserted.size()) << data.toBSON(); ASSERT(data.newFieldNamesToBeInserted.count(_timeField)) << data.toBSON(); ASSERT(data.newFieldNamesToBeInserted.count("a")) << data.toBSON(); // Inserting a new measurement with the same fields should return an empty set of new fields. - _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 1)); + + ASSERT_OK(_bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 1)) + .getStatus()); data = _bucketCatalog->commit(bucketId, _commitInfo); ASSERT_EQ(0U, data.newFieldNamesToBeInserted.size()) << data.toBSON(); // Insert a new measurement with the a new field. - _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2)); + ASSERT_OK(_bucketCatalog + ->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2)) + .getStatus()); data = _bucketCatalog->commit(bucketId, _commitInfo); ASSERT_EQ(1U, data.newFieldNamesToBeInserted.size()) << data.toBSON(); ASSERT(data.newFieldNamesToBeInserted.count("b")) << data.toBSON(); // Fill up the bucket. for (auto i = 3; i < gTimeseriesBucketMaxCount; ++i) { - _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << i)); + ASSERT_OK( + _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << i)) + .getStatus()); data = _bucketCatalog->commit(bucketId, _commitInfo); ASSERT_EQ(0U, data.newFieldNamesToBeInserted.size()) << i << ":" << data.toBSON(); } // When a bucket overflows, committing to the new overflow bucket should return the fields of // the first measurement as new fields. - auto [overflowBucketId, unusedCommitInfo] = _bucketCatalog->insert( + auto result2 = _bucketCatalog->insert( _opCtx, _ns1, BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount)); + auto& [overflowBucketId, unusedCommitInfo] = result2.getValue(); ASSERT_NE(*bucketId, *overflowBucketId); data = _bucketCatalog->commit(overflowBucketId); ASSERT_EQ(2U, data.newFieldNamesToBeInserted.size()) << data.toBSON(); |