summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Hausman <richard.hausman@mongodb.com>2022-07-06 15:32:32 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-06 17:10:55 +0000
commit2445615c2fa23a571d1c85fdd04cc28c6d1b2efa (patch)
tree1b70f0ba1ac9b05c98d9f8f094b6b30420a3fe0c
parentefde5e5d4aa90df5ea9543a25fd954ece536c608 (diff)
downloadmongo-2445615c2fa23a571d1c85fdd04cc28c6d1b2efa.tar.gz
SERVER-66983 Check version field on time-series bucket collections in validation
-rw-r--r--jstests/noPassthroughWithMongod/validate_timeseries_version.js110
-rw-r--r--src/mongo/db/catalog/validate_adaptor.cpp96
-rw-r--r--src/mongo/db/catalog/validate_state.h8
3 files changed, 202 insertions, 12 deletions
diff --git a/jstests/noPassthroughWithMongod/validate_timeseries_version.js b/jstests/noPassthroughWithMongod/validate_timeseries_version.js
new file mode 100644
index 00000000000..a9c05e810bf
--- /dev/null
+++ b/jstests/noPassthroughWithMongod/validate_timeseries_version.js
@@ -0,0 +1,110 @@
+/**
+ * Tests that the validate command checks data consistencies of the version field in time-series
+ * bucket collections and return warnings properly.
+ *
+ * @tags: [
+ * featureFlagExtendValidateCommand
+ * ]
+ */
+
+(function() {
+"use strict";
+let testCount = 0;
+const collNamePrefix = "validate_timeseries_version";
+const bucketNamePrefix = "system.buckets.validate_timeseries_version";
+let collName = collNamePrefix + testCount;
+let bucketName = bucketNamePrefix + testCount;
+let coll = null;
+let bucket = null;
+
+jsTestLog("Running the validate command to check time-series bucket versions");
+testCount += 1;
+collName = collNamePrefix + testCount;
+bucketName = bucketNamePrefix + testCount;
+db.getCollection(collName).drop();
+assert.commandWorked(db.createCollection(
+ collName, {timeseries: {timeField: "timestamp", metaField: "metadata", granularity: "hours"}}));
+coll = db.getCollection(collName);
+bucket = db.getCollection(bucketName);
+
+// Inserts documents into a bucket. Checks no issues are found.
+jsTestLog("Inserting documents into a bucket and checking that no issues are found.");
+coll.insertMany([...Array(10).keys()].map(
+ i =>
+ ({"metadata": {"sensorId": 1, "type": "temperature"}, "timestamp": ISODate(), "temp": i})));
+let res = bucket.validate();
+assert(res.valid, tojson(res));
+assert.eq(res.nNonCompliantDocuments, 0);
+assert.eq(res.warnings.length, 0);
+
+// Inserts documents into another bucket but manually changes the version to 2. Expects warnings
+// from validation.
+jsTestLog(
+ "Manually changing 'control.version' from 1 to 2 and checking for warnings from validation.");
+testCount += 1;
+collName = collNamePrefix + testCount;
+bucketName = bucketNamePrefix + testCount;
+db.getCollection(collName).drop();
+assert.commandWorked(db.createCollection(
+ collName, {timeseries: {timeField: "timestamp", metaField: "metadata", granularity: "hours"}}));
+coll = db.getCollection(collName);
+bucket = db.getCollection(bucketName);
+coll.insertMany([...Array(10).keys()].map(
+ i =>
+ ({"metadata": {"sensorId": 2, "type": "temperature"}, "timestamp": ISODate(), "temp": i})));
+bucket.updateOne({"meta.sensorId": 2}, {"$set": {"control.version": 2}});
+res = bucket.validate();
+assert(res.valid, tojson(res));
+assert.eq(res.nNonCompliantDocuments, 1);
+assert.eq(res.warnings.length, 1);
+
+// Inserts enough documents to close a bucket and then manually changes the version to 1.
+// Expects warnings from validation.
+jsTestLog(
+ "Changing the 'control.version' of a closed bucket from 2 to 1, and checking for warnings from validation.");
+testCount += 1;
+collName = collNamePrefix + testCount;
+bucketName = bucketNamePrefix + testCount;
+db.getCollection(collName).drop();
+assert.commandWorked(db.createCollection(
+ collName, {timeseries: {timeField: "timestamp", metaField: "metadata", granularity: "hours"}}));
+coll = db.getCollection(collName);
+bucket = db.getCollection(bucketName);
+coll.insertMany([...Array(1200).keys()].map(
+ i =>
+ ({"metadata": {"sensorId": 3, "type": "temperature"}, "timestamp": ISODate(), "temp": i})));
+bucket.updateOne({"meta.sensorId": 3, "control.version": 2}, {"$set": {"control.version": 1}});
+res = bucket.validate();
+assert(res.valid, tojson(res));
+assert.eq(res.nNonCompliantDocuments, 1);
+assert.eq(res.warnings.length, 1);
+
+// Returns warnings on a bucket with an unsupported version.
+jsTestLog("Changing 'control.version' to an unsupported version and checking for warnings.");
+testCount += 1;
+collName = collNamePrefix + testCount;
+bucketName = bucketNamePrefix + testCount;
+db.getCollection(collName).drop();
+assert.commandWorked(db.createCollection(
+ collName, {timeseries: {timeField: "timestamp", metaField: "metadata", granularity: "hours"}}));
+coll = db.getCollection(collName);
+bucket = db.getCollection(bucketName);
+coll.insertMany([...Array(1100).keys()].map(
+ i =>
+ ({"metadata": {"sensorId": 4, "type": "temperature"}, "timestamp": ISODate(), "temp": i})));
+bucket.updateOne({"meta.sensorId": 4, "control.version": 2}, {"$set": {"control.version": 500}});
+res = bucket.validate();
+assert(res.valid, tojson(res));
+assert.eq(res.nNonCompliantDocuments, 1);
+assert.eq(res.warnings.length, 1);
+
+// Creates a type-version mismatch in the previous bucket and checks that multiple warnings are
+// reported from a single collection with multiple inconsistent documents.
+jsTestLog(
+ "Making a type-version mismatch in the same bucket as the previous test and checking for warnings.");
+bucket.updateOne({"meta.sensorId": 4, "control.version": 1}, {"$set": {"control.version": 2}});
+res = bucket.validate();
+assert(res.valid, tojson(res));
+assert.eq(res.nNonCompliantDocuments, 2);
+assert.eq(res.warnings.length, 1);
+})(); \ No newline at end of file
diff --git a/src/mongo/db/catalog/validate_adaptor.cpp b/src/mongo/db/catalog/validate_adaptor.cpp
index 095d028f8f4..b6e36df4e59 100644
--- a/src/mongo/db/catalog/validate_adaptor.cpp
+++ b/src/mongo/db/catalog/validate_adaptor.cpp
@@ -49,9 +49,12 @@
#include "mongo/db/multi_key_path_tracker.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/record_id_helpers.h"
+#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/storage/execution_context.h"
#include "mongo/db/storage/key_string.h"
#include "mongo/db/storage/record_store.h"
+#include "mongo/db/storage/storage_parameters_gen.h"
+#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/object_check.h"
#include "mongo/util/fail_point.h"
@@ -74,6 +77,9 @@ const long long kInterruptIntervalNumBytes = 50 * 1024 * 1024; // 50MB.
static constexpr const char* kSchemaValidationFailedReason =
"Detected one or more documents not compliant with the collection's schema. Check logs for log "
"id 5363500.";
+static constexpr const char* kTimeseriesValidationInconsistencyReason =
+ "Detected one or more documents in this collection incompatible with time-series "
+ "specifications. For more info, see logs with log id 6698300.";
/**
* Validate that for each record in a clustered RecordStore the record key (RecordId) matches the
@@ -130,7 +136,56 @@ void schemaValidationFailed(CollectionValidation::ValidateState* state,
results->valid = false;
}
}
+/**
+ * Checks the value of the bucket's version and if it matches the types of the data fields.
+ */
+Status _validateTimeSeriesBucketRecord(const RecordId& recordId,
+ const BSONObj& recordBson,
+ ValidateResults* results) {
+ int controlVersion = recordBson.getField(timeseries::kBucketControlFieldName)
+ .Obj()
+ .getField(timeseries::kBucketControlVersionFieldName)
+ .Number();
+ if (controlVersion != 1 && controlVersion != 2) {
+ return Status(
+ ErrorCodes::BadValue,
+ fmt::format("Invalid value for 'control.version'. Expected 1 or 2, but got {}.",
+ controlVersion));
+ }
+ auto dataType = controlVersion == 1 ? BSONType::Object : BSONType::BinData;
+ // In addition to checking dataType, make sure that closed buckets have BinData Column subtype
+ auto isCorrectType = [&](BSONElement el) {
+ if (controlVersion == 1) {
+ return el.type() == BSONType::Object;
+ } else {
+ return el.type() == BSONType::BinData && el.binDataType() == BinDataType::Column;
+ }
+ };
+ BSONObj data = recordBson.getField(timeseries::kBucketDataFieldName).Obj();
+ for (BSONObjIterator bi(data); bi.more();) {
+ BSONElement e = bi.next();
+ if (!isCorrectType(e)) {
+ return Status(ErrorCodes::TypeMismatch,
+ fmt::format("Mismatch between TimeSeries schema version and data field "
+ "type. Expected type {}, but got {}.",
+ mongo::typeName(dataType),
+ mongo::typeName(e.type())));
+ }
+ }
+ return Status::OK();
+}
+
+
+void _timeseriesValidationFailed(CollectionValidation::ValidateState* state,
+ ValidateResults* results) {
+ if (state->isTimeseriesDataInconsistent()) {
+ // Only report the warning message once.
+ return;
+ }
+ state->setTimeseriesDataInconsistent();
+ results->warnings.push_back(kTimeseriesValidationInconsistencyReason);
+}
} // namespace
Status ValidateAdaptor::validateRecord(OperationContext* opCtx,
@@ -544,9 +599,10 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx,
// Because the progress meter is intended as an approximation, it's sufficient to get the number
// of records when we begin traversing, even if this number may deviate from the final number.
+ const auto& coll = _validateState->getCollection();
const char* curopMessage = "Validate: scanning documents";
- const auto totalRecords = _validateState->getCollection()->getRecordStore()->numRecords(opCtx);
- const auto rs = _validateState->getCollection()->getRecordStore();
+ const auto totalRecords = coll->getRecordStore()->numRecords(opCtx);
+ const auto rs = coll->getRecordStore();
{
stdx::unique_lock<Client> lk(*opCtx->getClient());
_progress.set(CurOp::get(opCtx)->setProgress_inlock(curopMessage, totalRecords));
@@ -625,18 +681,36 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx,
// If the document is not corrupted, validate the document against this collection's
// schema validator. Don't treat invalid documents as errors since documents can bypass
// document validation when being inserted or updated.
- auto result =
- _validateState->getCollection()->checkValidation(opCtx, record->data.toBson());
+ auto result = coll->checkValidation(opCtx, record->data.toBson());
if (result.first != Collection::SchemaValidationResult::kPass) {
LOGV2_WARNING(5363500,
"Document is not compliant with the collection's schema",
- logAttrs(_validateState->getCollection()->ns()),
+ logAttrs(coll->ns()),
"recordId"_attr = record->id,
"reason"_attr = result.second);
nNonCompliantDocuments++;
schemaValidationFailed(_validateState, result.first, results);
+ } else if (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ feature_flags::gExtendValidateCommand.isEnabled(
+ serverGlobalParams.featureCompatibility) &&
+ coll->getTimeseriesOptions()) {
+ // Checks for time-series collection consistency.
+ Status bucketStatus =
+ _validateTimeSeriesBucketRecord(record->id, record->data.toBson(), results);
+
+ // This log id should be kept in sync with the associated warning messages that are
+ // returned to the client.
+ if (!bucketStatus.isOK()) {
+ LOGV2_WARNING(6698300,
+ "Document is not compliant with time-series specifications",
+ logAttrs(coll->ns()),
+ "recordId"_attr = record->id,
+ "reason"_attr = bucketStatus);
+ nNonCompliantDocuments++;
+ _timeseriesValidationFailed(_validateState, results);
+ }
}
}
@@ -659,20 +733,18 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx,
<< " invalid documents.");
}
- const auto fastCount = _validateState->getCollection()->numRecords(opCtx);
+ const auto fastCount = coll->numRecords(opCtx);
if (_validateState->shouldEnforceFastCount() && fastCount != _numRecords) {
- results->errors.push_back(str::stream() << "fast count (" << fastCount
- << ") does not match number of records ("
- << _numRecords << ") for collection '"
- << _validateState->getCollection()->ns() << "'");
+ results->errors.push_back(
+ str::stream() << "fast count (" << fastCount << ") does not match number of records ("
+ << _numRecords << ") for collection '" << coll->ns() << "'");
results->valid = false;
}
// Do not update the record store stats if we're in the background as we've validated a
// checkpoint and it may not have the most up-to-date changes.
if (results->valid && !_validateState->isBackground()) {
- _validateState->getCollection()->getRecordStore()->updateStatsAfterRepair(
- opCtx, _numRecords, dataSizeTotal);
+ coll->getRecordStore()->updateStatsAfterRepair(opCtx, _numRecords, dataSizeTotal);
}
}
diff --git a/src/mongo/db/catalog/validate_state.h b/src/mongo/db/catalog/validate_state.h
index df796c686ce..03c9d53f560 100644
--- a/src/mongo/db/catalog/validate_state.h
+++ b/src/mongo/db/catalog/validate_state.h
@@ -96,6 +96,13 @@ public:
_collectionSchemaViolated = true;
}
+ bool isTimeseriesDataInconsistent() {
+ return _timeseriesDataInconsistency;
+ }
+ void setTimeseriesDataInconsistent() {
+ _timeseriesDataInconsistency = true;
+ }
+
bool fixErrors() const {
return _repairMode == RepairMode::kFixErrors;
}
@@ -211,6 +218,7 @@ private:
ValidateMode _mode;
RepairMode _repairMode;
bool _collectionSchemaViolated = false;
+ bool _timeseriesDataInconsistency = false;
boost::optional<ShouldNotConflictWithSecondaryBatchApplicationBlock> _noPBWM;
boost::optional<Lock::GlobalLock> _globalLock;