diff options
-rw-r--r-- | jstests/noPassthroughWithMongod/validate_timeseries_version.js | 110 | ||||
-rw-r--r-- | src/mongo/db/catalog/validate_adaptor.cpp | 96 | ||||
-rw-r--r-- | src/mongo/db/catalog/validate_state.h | 8 |
3 files changed, 202 insertions, 12 deletions
diff --git a/jstests/noPassthroughWithMongod/validate_timeseries_version.js b/jstests/noPassthroughWithMongod/validate_timeseries_version.js new file mode 100644 index 00000000000..a9c05e810bf --- /dev/null +++ b/jstests/noPassthroughWithMongod/validate_timeseries_version.js @@ -0,0 +1,110 @@ +/** + * Tests that the validate command checks data consistencies of the version field in time-series + * bucket collections and return warnings properly. + * + * @tags: [ + * featureFlagExtendValidateCommand + * ] + */ + +(function() { +"use strict"; +let testCount = 0; +const collNamePrefix = "validate_timeseries_version"; +const bucketNamePrefix = "system.buckets.validate_timeseries_version"; +let collName = collNamePrefix + testCount; +let bucketName = bucketNamePrefix + testCount; +let coll = null; +let bucket = null; + +jsTestLog("Running the validate command to check time-series bucket versions"); +testCount += 1; +collName = collNamePrefix + testCount; +bucketName = bucketNamePrefix + testCount; +db.getCollection(collName).drop(); +assert.commandWorked(db.createCollection( + collName, {timeseries: {timeField: "timestamp", metaField: "metadata", granularity: "hours"}})); +coll = db.getCollection(collName); +bucket = db.getCollection(bucketName); + +// Inserts documents into a bucket. Checks no issues are found. +jsTestLog("Inserting documents into a bucket and checking that no issues are found."); +coll.insertMany([...Array(10).keys()].map( + i => + ({"metadata": {"sensorId": 1, "type": "temperature"}, "timestamp": ISODate(), "temp": i}))); +let res = bucket.validate(); +assert(res.valid, tojson(res)); +assert.eq(res.nNonCompliantDocuments, 0); +assert.eq(res.warnings.length, 0); + +// Inserts documents into another bucket but manually changes the version to 2. Expects warnings +// from validation. +jsTestLog( + "Manually changing 'control.version' from 1 to 2 and checking for warnings from validation."); +testCount += 1; +collName = collNamePrefix + testCount; +bucketName = bucketNamePrefix + testCount; +db.getCollection(collName).drop(); +assert.commandWorked(db.createCollection( + collName, {timeseries: {timeField: "timestamp", metaField: "metadata", granularity: "hours"}})); +coll = db.getCollection(collName); +bucket = db.getCollection(bucketName); +coll.insertMany([...Array(10).keys()].map( + i => + ({"metadata": {"sensorId": 2, "type": "temperature"}, "timestamp": ISODate(), "temp": i}))); +bucket.updateOne({"meta.sensorId": 2}, {"$set": {"control.version": 2}}); +res = bucket.validate(); +assert(res.valid, tojson(res)); +assert.eq(res.nNonCompliantDocuments, 1); +assert.eq(res.warnings.length, 1); + +// Inserts enough documents to close a bucket and then manually changes the version to 1. +// Expects warnings from validation. +jsTestLog( + "Changing the 'control.version' of a closed bucket from 2 to 1, and checking for warnings from validation."); +testCount += 1; +collName = collNamePrefix + testCount; +bucketName = bucketNamePrefix + testCount; +db.getCollection(collName).drop(); +assert.commandWorked(db.createCollection( + collName, {timeseries: {timeField: "timestamp", metaField: "metadata", granularity: "hours"}})); +coll = db.getCollection(collName); +bucket = db.getCollection(bucketName); +coll.insertMany([...Array(1200).keys()].map( + i => + ({"metadata": {"sensorId": 3, "type": "temperature"}, "timestamp": ISODate(), "temp": i}))); +bucket.updateOne({"meta.sensorId": 3, "control.version": 2}, {"$set": {"control.version": 1}}); +res = bucket.validate(); +assert(res.valid, tojson(res)); +assert.eq(res.nNonCompliantDocuments, 1); +assert.eq(res.warnings.length, 1); + +// Returns warnings on a bucket with an unsupported version. +jsTestLog("Changing 'control.version' to an unsupported version and checking for warnings."); +testCount += 1; +collName = collNamePrefix + testCount; +bucketName = bucketNamePrefix + testCount; +db.getCollection(collName).drop(); +assert.commandWorked(db.createCollection( + collName, {timeseries: {timeField: "timestamp", metaField: "metadata", granularity: "hours"}})); +coll = db.getCollection(collName); +bucket = db.getCollection(bucketName); +coll.insertMany([...Array(1100).keys()].map( + i => + ({"metadata": {"sensorId": 4, "type": "temperature"}, "timestamp": ISODate(), "temp": i}))); +bucket.updateOne({"meta.sensorId": 4, "control.version": 2}, {"$set": {"control.version": 500}}); +res = bucket.validate(); +assert(res.valid, tojson(res)); +assert.eq(res.nNonCompliantDocuments, 1); +assert.eq(res.warnings.length, 1); + +// Creates a type-version mismatch in the previous bucket and checks that multiple warnings are +// reported from a single collection with multiple inconsistent documents. +jsTestLog( + "Making a type-version mismatch in the same bucket as the previous test and checking for warnings."); +bucket.updateOne({"meta.sensorId": 4, "control.version": 1}, {"$set": {"control.version": 2}}); +res = bucket.validate(); +assert(res.valid, tojson(res)); +assert.eq(res.nNonCompliantDocuments, 2); +assert.eq(res.warnings.length, 1); +})();
\ No newline at end of file diff --git a/src/mongo/db/catalog/validate_adaptor.cpp b/src/mongo/db/catalog/validate_adaptor.cpp index 095d028f8f4..b6e36df4e59 100644 --- a/src/mongo/db/catalog/validate_adaptor.cpp +++ b/src/mongo/db/catalog/validate_adaptor.cpp @@ -49,9 +49,12 @@ #include "mongo/db/multi_key_path_tracker.h" #include "mongo/db/operation_context.h" #include "mongo/db/record_id_helpers.h" +#include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/storage/execution_context.h" #include "mongo/db/storage/key_string.h" #include "mongo/db/storage/record_store.h" +#include "mongo/db/storage/storage_parameters_gen.h" +#include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/logv2/log.h" #include "mongo/rpc/object_check.h" #include "mongo/util/fail_point.h" @@ -74,6 +77,9 @@ const long long kInterruptIntervalNumBytes = 50 * 1024 * 1024; // 50MB. static constexpr const char* kSchemaValidationFailedReason = "Detected one or more documents not compliant with the collection's schema. Check logs for log " "id 5363500."; +static constexpr const char* kTimeseriesValidationInconsistencyReason = + "Detected one or more documents in this collection incompatible with time-series " + "specifications. For more info, see logs with log id 6698300."; /** * Validate that for each record in a clustered RecordStore the record key (RecordId) matches the @@ -130,7 +136,56 @@ void schemaValidationFailed(CollectionValidation::ValidateState* state, results->valid = false; } } +/** + * Checks the value of the bucket's version and if it matches the types of the data fields. + */ +Status _validateTimeSeriesBucketRecord(const RecordId& recordId, + const BSONObj& recordBson, + ValidateResults* results) { + int controlVersion = recordBson.getField(timeseries::kBucketControlFieldName) + .Obj() + .getField(timeseries::kBucketControlVersionFieldName) + .Number(); + if (controlVersion != 1 && controlVersion != 2) { + return Status( + ErrorCodes::BadValue, + fmt::format("Invalid value for 'control.version'. Expected 1 or 2, but got {}.", + controlVersion)); + } + auto dataType = controlVersion == 1 ? BSONType::Object : BSONType::BinData; + // In addition to checking dataType, make sure that closed buckets have BinData Column subtype + auto isCorrectType = [&](BSONElement el) { + if (controlVersion == 1) { + return el.type() == BSONType::Object; + } else { + return el.type() == BSONType::BinData && el.binDataType() == BinDataType::Column; + } + }; + BSONObj data = recordBson.getField(timeseries::kBucketDataFieldName).Obj(); + for (BSONObjIterator bi(data); bi.more();) { + BSONElement e = bi.next(); + if (!isCorrectType(e)) { + return Status(ErrorCodes::TypeMismatch, + fmt::format("Mismatch between TimeSeries schema version and data field " + "type. Expected type {}, but got {}.", + mongo::typeName(dataType), + mongo::typeName(e.type()))); + } + } + return Status::OK(); +} + + +void _timeseriesValidationFailed(CollectionValidation::ValidateState* state, + ValidateResults* results) { + if (state->isTimeseriesDataInconsistent()) { + // Only report the warning message once. + return; + } + state->setTimeseriesDataInconsistent(); + results->warnings.push_back(kTimeseriesValidationInconsistencyReason); +} } // namespace Status ValidateAdaptor::validateRecord(OperationContext* opCtx, @@ -544,9 +599,10 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx, // Because the progress meter is intended as an approximation, it's sufficient to get the number // of records when we begin traversing, even if this number may deviate from the final number. + const auto& coll = _validateState->getCollection(); const char* curopMessage = "Validate: scanning documents"; - const auto totalRecords = _validateState->getCollection()->getRecordStore()->numRecords(opCtx); - const auto rs = _validateState->getCollection()->getRecordStore(); + const auto totalRecords = coll->getRecordStore()->numRecords(opCtx); + const auto rs = coll->getRecordStore(); { stdx::unique_lock<Client> lk(*opCtx->getClient()); _progress.set(CurOp::get(opCtx)->setProgress_inlock(curopMessage, totalRecords)); @@ -625,18 +681,36 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx, // If the document is not corrupted, validate the document against this collection's // schema validator. Don't treat invalid documents as errors since documents can bypass // document validation when being inserted or updated. - auto result = - _validateState->getCollection()->checkValidation(opCtx, record->data.toBson()); + auto result = coll->checkValidation(opCtx, record->data.toBson()); if (result.first != Collection::SchemaValidationResult::kPass) { LOGV2_WARNING(5363500, "Document is not compliant with the collection's schema", - logAttrs(_validateState->getCollection()->ns()), + logAttrs(coll->ns()), "recordId"_attr = record->id, "reason"_attr = result.second); nNonCompliantDocuments++; schemaValidationFailed(_validateState, result.first, results); + } else if (serverGlobalParams.featureCompatibility.isVersionInitialized() && + feature_flags::gExtendValidateCommand.isEnabled( + serverGlobalParams.featureCompatibility) && + coll->getTimeseriesOptions()) { + // Checks for time-series collection consistency. + Status bucketStatus = + _validateTimeSeriesBucketRecord(record->id, record->data.toBson(), results); + + // This log id should be kept in sync with the associated warning messages that are + // returned to the client. + if (!bucketStatus.isOK()) { + LOGV2_WARNING(6698300, + "Document is not compliant with time-series specifications", + logAttrs(coll->ns()), + "recordId"_attr = record->id, + "reason"_attr = bucketStatus); + nNonCompliantDocuments++; + _timeseriesValidationFailed(_validateState, results); + } } } @@ -659,20 +733,18 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx, << " invalid documents."); } - const auto fastCount = _validateState->getCollection()->numRecords(opCtx); + const auto fastCount = coll->numRecords(opCtx); if (_validateState->shouldEnforceFastCount() && fastCount != _numRecords) { - results->errors.push_back(str::stream() << "fast count (" << fastCount - << ") does not match number of records (" - << _numRecords << ") for collection '" - << _validateState->getCollection()->ns() << "'"); + results->errors.push_back( + str::stream() << "fast count (" << fastCount << ") does not match number of records (" + << _numRecords << ") for collection '" << coll->ns() << "'"); results->valid = false; } // Do not update the record store stats if we're in the background as we've validated a // checkpoint and it may not have the most up-to-date changes. if (results->valid && !_validateState->isBackground()) { - _validateState->getCollection()->getRecordStore()->updateStatsAfterRepair( - opCtx, _numRecords, dataSizeTotal); + coll->getRecordStore()->updateStatsAfterRepair(opCtx, _numRecords, dataSizeTotal); } } diff --git a/src/mongo/db/catalog/validate_state.h b/src/mongo/db/catalog/validate_state.h index df796c686ce..03c9d53f560 100644 --- a/src/mongo/db/catalog/validate_state.h +++ b/src/mongo/db/catalog/validate_state.h @@ -96,6 +96,13 @@ public: _collectionSchemaViolated = true; } + bool isTimeseriesDataInconsistent() { + return _timeseriesDataInconsistency; + } + void setTimeseriesDataInconsistent() { + _timeseriesDataInconsistency = true; + } + bool fixErrors() const { return _repairMode == RepairMode::kFixErrors; } @@ -211,6 +218,7 @@ private: ValidateMode _mode; RepairMode _repairMode; bool _collectionSchemaViolated = false; + bool _timeseriesDataInconsistency = false; boost::optional<ShouldNotConflictWithSecondaryBatchApplicationBlock> _noPBWM; boost::optional<Lock::GlobalLock> _globalLock; |