diff options
23 files changed, 78 insertions, 775 deletions
diff --git a/jstests/concurrency/fsm_workloads/timeseries_agg_out.js b/jstests/concurrency/fsm_workloads/timeseries_agg_out.js deleted file mode 100644 index 292826b5b67..00000000000 --- a/jstests/concurrency/fsm_workloads/timeseries_agg_out.js +++ /dev/null @@ -1,133 +0,0 @@ -'use strict'; -/** - * This test runs many concurrent aggregations using $out, writing to the same time-series - * collection. While this is happening, other threads may be creating or dropping indexes, changing - * the collection options, or sharding the collection. We expect an aggregate with a $out stage to - * fail if another client executed one of these changes between the creation of $out's temporary - * collection and the eventual rename to the target collection. - * - * Unfortunately, there aren't very many assertions we can make here, so this is mostly to test that - * the server doesn't deadlock or crash, and that temporary namespaces are cleaned up. - * - * @tags: [ - * requires_timeseries, - * does_not_support_transactions, - * does_not_support_stepdowns, - * requires_capped, - * assumes_unsharded_collection, - * # TODO SERVER-74601 remove tag after support for secondaries. - * does_not_support_causal_consistency, - * requires_fcv_70 - * ] - */ -load('jstests/concurrency/fsm_workloads/agg_out.js'); // for $super state functions - -var $config = extendWorkload($config, function($config, $super) { - const timeFieldName = 'time'; - const metaFieldName = 'tag'; - const numDocs = 100; - $config.data.outputCollName = 'timeseries_agg_out'; - /** - * Runs an aggregate with a $out with time-series into '$config.data.outputCollName'. - */ - $config.states.query = function query(db, collName) { - const res = db[collName].runCommand({ - aggregate: collName, - pipeline: [ - {$set: {"time": new Date()}}, - { - $out: { - db: db.getName(), - coll: this.outputCollName, - timeseries: {timeField: "time"} - } - } - ], - cursor: {} - }); - - const allowedErrorCodes = [ - ErrorCodes.CommandFailed, // indexes of target collection changed during processing. - ErrorCodes.IllegalOperation, // $out is not supported to an existing *sharded* output - // collection. - 17152, // namespace is capped so it can't be used for $out. - 28769, // $out collection cannot be sharded. - ]; - assertWhenOwnDB.commandWorkedOrFailedWithCode(res, allowedErrorCodes); - if (res.ok) { - const cursor = new DBCommandCursor(db, res); - assertAlways.eq(0, cursor.itcount()); // No matter how many documents were in the - // original input stream, $out should never - // return any results. - } - }; - - /** - * Changes the 'expireAfterSeconds' value for the time-series collection. - */ - $config.states.collMod = function collMod(db, unusedCollName) { - let expireAfterSeconds = "off"; - if (Random.rand() < 0.5) { - // Change the expireAfterSeconds - expireAfterSeconds = Random.rand(); - } - - assertWhenOwnDB.commandWorkedOrFailedWithCode( - db.runCommand({collMod: this.outputCollName, expireAfterSeconds: expireAfterSeconds}), - ErrorCodes.ConflictingOperationInProgress); - }; - - /** - * 'convertToCapped' should always fail with a 'CommandNotSupportedOnView' error. - */ - $config.states.convertToCapped = function convertToCapped(db, unusedCollName) { - assertWhenOwnDB.commandFailedWithCode( - db.runCommand({convertToCapped: this.outputCollName, size: 100000}), - ErrorCodes.CommandNotSupportedOnView); - }; - - /** - * If being run against a mongos, shards '$config.data.outputCollName'. This is never undone, - * and all subsequent $out's to this collection should fail. - */ - $config.states.shardCollection = function shardCollection(db, unusedCollName) { - try { - $super.states.shardCollection.apply(this, arguments); - } catch (err) { - if (err.code !== ErrorCodes.Interrupted) { - throw err; - } - } - }; - - $config.teardown = function teardown(db, collName, cluster) { - const collNames = db.getCollectionNames(); - // Ensure that a temporary collection is not left behind. - assertAlways.eq(db.getCollectionNames() - .filter(col => col.includes('system.buckets.tmp.agg_out')) - .length, - 0); - - // Ensure that for the buckets collection there is a corresponding view. - assertAlways(!(collNames.includes('system.buckets.interrupt_temp_out') && - !collNames.includes('interrupt_temp_out'))); - }; - - /** - * Create a time-series collection and insert 100 documents. - */ - $config.setup = function setup(db, collName, cluster) { - assert.commandWorked(db.createCollection( - collName, {timeseries: {timeField: timeFieldName, metaField: metaFieldName}})); - const docs = []; - for (let i = 0; i < numDocs; ++i) { - docs.push({ - [timeFieldName]: ISODate(), - [metaFieldName]: (this.tid * numDocs) + i, - }); - } - assert.commandWorked(db.runCommand({insert: collName, documents: docs, ordered: false})); - }; - - return $config; -}); diff --git a/jstests/concurrency/fsm_workloads/timeseries_agg_out_interrupt_cleanup.js b/jstests/concurrency/fsm_workloads/timeseries_agg_out_interrupt_cleanup.js deleted file mode 100644 index ee79bf9a502..00000000000 --- a/jstests/concurrency/fsm_workloads/timeseries_agg_out_interrupt_cleanup.js +++ /dev/null @@ -1,90 +0,0 @@ -'use strict'; -/** - * Tests $out stage of aggregate command with time-series collections concurrently with killOp. - * Ensures that all the temporary collections created during the aggregate command are deleted and - * that all buckets collection have a corresponding view. This workloads extends - * 'agg_out_interrupt_cleanup'. - * - * @tags: [ - * requires_timeseries, - * does_not_support_transactions, - * does_not_support_stepdowns, - * uses_curop_agg_stage, - * assumes_unsharded_collection, - * requires_fcv_70 - * ] - */ -load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload -load('jstests/concurrency/fsm_workloads/agg_base.js'); // for $config -load( - 'jstests/concurrency/fsm_workloads/agg_out_interrupt_cleanup.js'); // for killOpsMatchingFilter - -var $config = extendWorkload($config, function($config, $super) { - const timeFieldName = 'time'; - const metaFieldName = 'tag'; - const numDocs = 100; - - $config.states.aggregate = function aggregate(db, collName) { - // drop the view to ensure that each time a buckets collection is made, the view will also - // be made or both be destroyed. - assert(db["interrupt_temp_out"].drop()); - // $out to the same collection so that concurrent aggregate commands would cause congestion. - db[collName].runCommand({ - aggregate: collName, - pipeline: [{ - $out: - {db: db.getName(), coll: "interrupt_temp_out", timeseries: {timeField: "time"}} - }], - cursor: {} - }); - }; - - $config.states.killOp = function killOp(db, collName) { - // The aggregate command could be running different commands internally (renameCollection, - // insertDocument, etc.) depending on which stage of execution it is in. So, get all the - // operations that are running against the input, output or temp collections. - $super.data.killOpsMatchingFilter(db, { - op: "command", - active: true, - $or: [ - {"ns": db.getName() + ".interrupt_temp_out"}, // For the view. - {"ns": db.getName() + "." + collName}, // For input collection. - // For the tmp collection. - {"ns": {$regex: "^" + db.getName() + "\.system.buckets\.tmp\.agg_out.*"}} - ], - "command.drop": { - $exists: false - } // Exclude 'drop' command from the filter to make sure that we don't kill the the - // drop command which is responsible for dropping the temporary collection. - }); - }; - - $config.teardown = function teardown(db, collName, cluster) { - const collNames = db.getCollectionNames(); - // Ensure that a temporary collection is not left behind. - assertAlways.eq( - collNames.filter(coll => coll.includes('system.buckets.tmp.agg_out')).length, 0); - - // Ensure that for the buckets collection there is a corresponding view. - assertAlways(!(collNames.includes('system.buckets.interrupt_temp_out') && - !collNames.includes('interrupt_temp_out'))); - }; - - /** - * Create a time-series collection and insert 100 documents. - */ - $config.setup = function setup(db, collName, cluster) { - assert.commandWorked(db.createCollection( - collName, {timeseries: {timeField: timeFieldName, metaField: metaFieldName}})); - const docs = []; - for (let i = 0; i < numDocs; ++i) { - docs.push({ - [timeFieldName]: ISODate(), - [metaFieldName]: (this.tid * numDocs) + i, - }); - } - assert.commandWorked(db.runCommand({insert: collName, documents: docs, ordered: false})); - }; - - return $config; -}); diff --git a/jstests/core/timeseries/libs/timeseries_agg_helpers.js b/jstests/core/timeseries/libs/timeseries_agg_helpers.js index becad2575ae..a2a2f74393f 100644 --- a/jstests/core/timeseries/libs/timeseries_agg_helpers.js +++ b/jstests/core/timeseries/libs/timeseries_agg_helpers.js @@ -78,13 +78,11 @@ var TimeseriesAggTests = class { /** * Gets an output collection object with the name 'outCollname'. */ - static getOutputCollection(outCollName, shouldDrop) { + static getOutputCollection(outCollName) { const testDB = TimeseriesAggTests.getTestDb(); let outColl = testDB.getCollection(outCollName); - if (shouldDrop) { - outColl.drop(); - } + outColl.drop(); return outColl; } @@ -98,29 +96,21 @@ var TimeseriesAggTests = class { * Executes 'prepareAction' before executing 'pipeline'. 'prepareAction' takes a collection * parameter and returns nothing. * - * If 'shouldDrop' is set to false, the output collection will not be dropped before executing - * 'pipeline'. - * * Returns sorted data by "time" field. The sorted result data will help simplify comparison * logic. */ - static getOutputAggregateResults(inColl, pipeline, prepareAction = null, shouldDrop = true) { + static getOutputAggregateResults(inColl, pipeline, prepareAction = null) { // Figures out the output collection name from the last pipeline stage. var outCollName = "out"; if (pipeline[pipeline.length - 1]["$out"] != undefined) { - // If the last stage is "$out", gets the output collection name from the string or - // object input. - if (typeof pipeline[pipeline.length - 1]["$out"] == 'string') { - outCollName = pipeline[pipeline.length - 1]["$out"]; - } else { - outCollName = pipeline[pipeline.length - 1]["$out"]["coll"]; - } + // If the last stage is "$out", gets the output collection name from it. + outCollName = pipeline[pipeline.length - 1]["$out"]; } else if (pipeline[pipeline.length - 1]["$merge"] != undefined) { // If the last stage is "$merge", gets the output collection name from it. outCollName = pipeline[pipeline.length - 1]["$merge"].into; } - let outColl = TimeseriesAggTests.getOutputCollection(outCollName, shouldDrop); + let outColl = TimeseriesAggTests.getOutputCollection(outCollName); if (prepareAction != null) { prepareAction(outColl); } @@ -132,14 +122,4 @@ var TimeseriesAggTests = class { .sort({"time": 1}) .toArray(); } - - static verifyResults(actualResults, expectedResults) { - // Verifies that the number of measurements is same as expected. - assert.eq(actualResults.length, expectedResults.length, actualResults); - - // Verifies that every measurement is same as expected. - for (var i = 0; i < expectedResults.length; ++i) { - assert.eq(actualResults[i], expectedResults[i], actualResults); - } - } }; diff --git a/jstests/core/timeseries/timeseries_out.js b/jstests/core/timeseries/timeseries_out.js index 3a95fd6cacc..46beccd99ed 100644 --- a/jstests/core/timeseries/timeseries_out.js +++ b/jstests/core/timeseries/timeseries_out.js @@ -7,11 +7,6 @@ * does_not_support_stepdowns, * # We need a timeseries collection. * requires_timeseries, - * # TODO SERVER-74601 remove tag after support for secondaries. - * assumes_read_preference_unchanged, - * # TODO SERVER-74601 remove tag after support for sharded clusters. - * assumes_against_mongod_not_mongos, - * requires_fcv_70 * ] */ (function() { @@ -26,97 +21,18 @@ const numIterations = 20; let [inColl, observerInColl] = TimeseriesAggTests.prepareInputCollections(numHosts, numIterations); -function generateOutPipeline(collName, options, aggStage = null) { - let outStage = {$out: {db: testDB.getName(), coll: collName, timeseries: options}}; - if (aggStage) { - return [aggStage, outStage]; - } - return [outStage]; -} +// Gets the expected results from non time-series observer input collection. +let expectedResults = + TimeseriesAggTests.getOutputAggregateResults(observerInColl, [{$out: "observer_out"}]); -function runTest(observerPipeline, actualPipeline, shouldDrop = true, valueToCheck = null) { - // Gets the expected results from a non time-series observer input collection. - const expectedResults = TimeseriesAggTests.getOutputAggregateResults( - observerInColl, observerPipeline, null, shouldDrop); +// Gets the actual results from time-series input collection. +let actualResults = TimeseriesAggTests.getOutputAggregateResults(inColl, [{$out: "out"}]); - // Gets the actual results from a time-series input collection. - const actualResults = - TimeseriesAggTests.getOutputAggregateResults(inColl, actualPipeline, null, shouldDrop); +// Verifies that the number of measurements is same as expected. +assert.eq(actualResults.length, expectedResults.length, actualResults); - // Verifies that the number of measurements is same as expected. - TimeseriesAggTests.verifyResults(actualResults, expectedResults); - if (valueToCheck) { - for (var i = 0; i < expectedResults.length; ++i) { - assert.eq(actualResults[i], {"time": valueToCheck}, actualResults); - } - } +// Verifies that every measurement is same as expected. +for (var i = 0; i < expectedResults.length; ++i) { + assert.eq(actualResults[i], expectedResults[i], actualResults); } - -// Tests that $out works with time-series collections writing to a non-timeseries collection. -runTest([{$out: "observer_out"}], [{$out: "out"}]); - -// Tests that $out creates a time-series collection when the collection does not exist. -let actualPipeline = generateOutPipeline("out_time", {timeField: "time", metaField: "tags"}); -runTest([{$out: "observer_out"}], actualPipeline); - -// Tests that $out creates a time-series collection with more time-series options. -actualPipeline = generateOutPipeline( - "out_time", - {timeField: "time", metaField: "tags", bucketMaxSpanSeconds: 100, bucketRoundingSeconds: 100}); -runTest([{$out: "observer_out"}], actualPipeline); - -// Change an option in the existing time-series collections. -assert.commandWorked(testDB.runCommand({collMod: "out_time", expireAfterSeconds: 360})); -assert.commandWorked(testDB.runCommand({collMod: "observer_out", validationLevel: "moderate"})); - -// Tests that a time-series collection is replaced when a time-series collection does exist. -let newDate = new Date('1999-09-30T03:24:00'); -let observerPipeline = [{$set: {"time": newDate}}, {$out: "observer_out"}]; -actualPipeline = generateOutPipeline("out_time", {timeField: "time"}, {$set: {"time": newDate}}); - -// Confirms that all the documents have the 'newDate' value. -runTest(observerPipeline, actualPipeline, false, newDate); - -// Confirms that the original time-series collection options were preserved by $out. -let collections = assert.commandWorked(testDB.runCommand({listCollections: 1})).cursor.firstBatch; -let coll = collections.find(entry => entry.name === "out_time"); -assert.eq(coll["options"]["expireAfterSeconds"], 360); -coll = collections.find(entry => entry.name === "observer_out"); -assert.eq(coll["options"]["validationLevel"], "moderate"); - -// Tests that an error is raised when trying to create a time-series collection from a non -// time-series collection. -let pipeline = generateOutPipeline("observer_out", {timeField: "time"}); -assert.throwsWithCode(() => inColl.aggregate(pipeline), 7268700); -assert.throwsWithCode(() => observerInColl.aggregate(pipeline), 7268700); - -// Tests that an error is raised for invalid timeseries options. -pipeline = generateOutPipeline("out_time", {timeField: "time", invalidField: "invalid"}); -assert.throwsWithCode(() => inColl.aggregate(pipeline), 40415); -assert.throwsWithCode(() => observerInColl.aggregate(pipeline), 40415); - -// Tests that an error is raised if the time-series specification changes. -pipeline = generateOutPipeline("out_time", {timeField: "usage_guest_nice"}); -assert.throwsWithCode(() => inColl.aggregate(pipeline), 7268701); -assert.throwsWithCode(() => observerInColl.aggregate(pipeline), 7268701); - -pipeline = generateOutPipeline("out_time", {timeField: "time", metaField: "usage_guest_nice"}); -assert.throwsWithCode(() => inColl.aggregate(pipeline), 7268702); -assert.throwsWithCode(() => observerInColl.aggregate(pipeline), 7268702); - -// Tests that a time-series collection can be replaced with a non-timeseries collection. -runTest([{"$out": "observer_out_time"}], [{"$out": "out_time"}]); - -// Tests that an error is raised if a conflicting view exists. -assert.commandWorked(testDB.createCollection("view_out", {viewOn: "out"})); - -pipeline = generateOutPipeline("view_out", {timeField: "time"}); -assert.throwsWithCode(() => inColl.aggregate(pipeline), 7268703); -assert.throwsWithCode(() => observerInColl.aggregate(pipeline), 7268703); - -// Test $out for time-series works with a non-existent database. -const destDB = testDB.getSiblingDB("outDifferentDB"); -assert.commandWorked(destDB.dropDatabase()); -inColl.aggregate({$out: {db: destDB.getName(), coll: "out_time", timeseries: {timeField: "time"}}}); -assert.eq(300, destDB["out_time"].find().itcount()); })(); diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp index 8a18d866fa2..2707a8457b2 100644 --- a/src/mongo/db/catalog/create_collection.cpp +++ b/src/mongo/db/catalog/create_collection.cpp @@ -332,11 +332,9 @@ BSONObj _generateTimeseriesValidator(int bucketVersion, StringData timeField) { }; } -Status _createTimeseries( - OperationContext* opCtx, - const NamespaceString& ns, - const CollectionOptions& optionsArg, - enum TimeseriesCreateLevel createOpt = TimeseriesCreateLevel::kBothCollAndView) { +Status _createTimeseries(OperationContext* opCtx, + const NamespaceString& ns, + const CollectionOptions& optionsArg) { // This path should only be taken when a user creates a new time-series collection on the // primary. Secondaries replicate individual oplog entries. invariant(!ns.isTimeseriesBucketsCollection()); @@ -460,9 +458,8 @@ Status _createTimeseries( return Status::OK(); }); - // If compatible bucket collection already exists then proceed with creating view defintion. - if ((!ret.isOK() && !existingBucketCollectionIsCompatible) || - createOpt == TimeseriesCreateLevel::kBucketsCollOnly) + // If compatible bucket collection already exists then proceed with creating view definition. + if (!ret.isOK() && !existingBucketCollectionIsCompatible) return ret; ret = writeConflictRetry(opCtx, "create", ns.ns(), [&]() -> Status { @@ -706,19 +703,6 @@ Status createCollection(OperationContext* opCtx, } } // namespace -Status createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - TimeseriesCreateLevel level) { - StatusWith<CollectionOptions> statusWith = - CollectionOptions::parse(options, CollectionOptions::parseForCommand); - if (!statusWith.isOK()) { - return statusWith.getStatus(); - } - auto collectionOptions = statusWith.getValue(); - return _createTimeseries(opCtx, ns, collectionOptions, level); -} - Status createCollection(OperationContext* opCtx, const DatabaseName& dbName, const BSONObj& cmdObj, diff --git a/src/mongo/db/catalog/create_collection.h b/src/mongo/db/catalog/create_collection.h index 378a16979b8..655644c4e36 100644 --- a/src/mongo/db/catalog/create_collection.h +++ b/src/mongo/db/catalog/create_collection.h @@ -42,8 +42,6 @@ namespace mongo { class OperationContext; -enum class TimeseriesCreateLevel { kBothCollAndView, kBucketsCollOnly }; - /** * Creates a collection as described in "cmdObj" on the database "dbName". Creates the collection's * _id index according to 'idIndex', if it is non-empty. When 'idIndex' is empty, creates the @@ -88,13 +86,5 @@ Status createCollectionForApplyOps(OperationContext* opCtx, const BSONObj& cmdObj, bool allowRenameOutOfTheWay, const boost::optional<BSONObj>& idIndex = boost::none); -/** - * Creates a time-series collection as described in 'option' on the namespace 'ns'. If the level is - * set to 'kBothCollAndView' both the buckets collection and the view will be created. If the level - * is set to 'kBucketsCollOnly' only the buckets collection will be created. - */ -Status createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - TimeseriesCreateLevel level = TimeseriesCreateLevel::kBothCollAndView); + } // namespace mongo diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index c9a961ae126..08c9d8f5f75 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -805,8 +805,7 @@ void doLocalRenameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, void validateNamespacesForRenameCollection(OperationContext* opCtx, const NamespaceString& source, - const NamespaceString& target, - const RenameCollectionOptions& options) { + const NamespaceString& target) { uassert(ErrorCodes::InvalidNamespace, str::stream() << "Invalid source namespace: " << source.ns(), source.isValid()); @@ -860,7 +859,7 @@ void validateNamespacesForRenameCollection(OperationContext* opCtx, uassert(ErrorCodes::IllegalOperation, "Renaming system.buckets collections is not allowed", - options.allowBuckets || !source.isTimeseriesBucketsCollection()); + !source.isTimeseriesBucketsCollection()); } void validateAndRunRenameCollection(OperationContext* opCtx, @@ -869,7 +868,7 @@ void validateAndRunRenameCollection(OperationContext* opCtx, const RenameCollectionOptions& options) { invariant(source != target, "Can't rename a collection to itself"); - validateNamespacesForRenameCollection(opCtx, source, target, options); + validateNamespacesForRenameCollection(opCtx, source, target); OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( opCtx); diff --git a/src/mongo/db/catalog/rename_collection.h b/src/mongo/db/catalog/rename_collection.h index b93e235f52b..8697b70c2dc 100644 --- a/src/mongo/db/catalog/rename_collection.h +++ b/src/mongo/db/catalog/rename_collection.h @@ -45,14 +45,12 @@ class OpTime; /** * Renames the collection from "source" to "target" and drops the existing collection if * "dropTarget" is true. "stayTemp" indicates whether a collection should maintain its - * temporariness. "allowBuckets" indicates whether a time-series buckets collection should be - * allowed to be renamed. + * temporariness. */ struct RenameCollectionOptions { bool dropTarget = false; bool stayTemp = false; bool markFromMigrate = false; - bool allowBuckets = false; boost::optional<UUID> expectedSourceUUID; boost::optional<UUID> expectedTargetUUID; }; @@ -95,11 +93,9 @@ Status renameCollectionForRollback(OperationContext* opCtx, /** * Performs validation checks to ensure source and target namespaces are eligible for rename. */ -void validateNamespacesForRenameCollection( - OperationContext* opCtx, - const NamespaceString& source, - const NamespaceString& target, - const RenameCollectionOptions& options = RenameCollectionOptions()); +void validateNamespacesForRenameCollection(OperationContext* opCtx, + const NamespaceString& source, + const NamespaceString& target); /** * Runs renameCollection() with preliminary validation checks to ensure source diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 88efa301556..7c4166b8bf3 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -930,8 +930,7 @@ Status runAggregate(OperationContext* opCtx, resolvedView.getNamespace(), ShardVersion::UNSHARDED() /* shardVersion */, boost::none /* databaseVersion */); - } - + }; uassert(std::move(resolvedView), "Explain of a resolved view must be executed by mongos", !ShardingState::get(opCtx)->enabled() || !request.getExplain()); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 5da84f9b88d..01e1d759abf 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -381,7 +381,6 @@ pipelineEnv.Library( '$BUILD_DIR/mongo/db/storage/encryption_hooks', '$BUILD_DIR/mongo/db/storage/index_entry_comparison', '$BUILD_DIR/mongo/db/storage/storage_options', - '$BUILD_DIR/mongo/db/timeseries/catalog_helper', '$BUILD_DIR/mongo/db/update/update_document_diff', '$BUILD_DIR/mongo/db/views/resolved_view', '$BUILD_DIR/mongo/s/is_mongos', @@ -395,7 +394,6 @@ pipelineEnv.Library( 'granularity_rounder', ], LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/catalog/collection_catalog', '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/fts/base_fts', '$BUILD_DIR/mongo/db/mongohasher', @@ -484,7 +482,6 @@ env.Library( 'document_source_merge.idl', 'document_source_merge_modes.idl', 'document_source_merge_spec.cpp', - 'document_source_out.idl', 'document_source_parsing_validators.cpp', 'document_source_replace_root.idl', 'document_source_set_window_fields.idl', @@ -498,7 +495,6 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/exec/document_value/document_value', '$BUILD_DIR/mongo/db/storage/key_string', - '$BUILD_DIR/mongo/db/timeseries/timeseries_options', '$BUILD_DIR/mongo/idl/idl_parser', '$BUILD_DIR/mongo/s/common_s', 'runtime_constants_idl', diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 1864a8b1629..12abd30a6b7 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -37,7 +37,6 @@ #include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/pipeline/document_path_support.h" -#include "mongo/db/timeseries/catalog_helper.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/destructor_guard.h" @@ -62,13 +61,7 @@ DocumentSourceOut::~DocumentSourceOut() { // Make sure we drop the temp collection if anything goes wrong. Errors are ignored // here because nothing can be done about them. Additionally, if this fails and the // collection is left behind, it will be cleaned up next time the server is started. - - // If creating a time-series collection, we must drop the "real" buckets collection, if - // anything goes wrong creating the view. - - // If creating a time-series collection, '_tempNs' is translated to include the - // "system.buckets" prefix. - if (_tempNs.size() || (_timeseries && !_timeseriesViewCreated)) { + if (_tempNs.size()) { auto cleanupClient = pExpCtx->opCtx->getServiceContext()->makeClient("$out_replace_coll_cleanup"); @@ -85,92 +78,52 @@ DocumentSourceOut::~DocumentSourceOut() { DocumentSourceWriteBlock writeBlock(cleanupOpCtx.get()); - auto deleteNs = _tempNs.size() ? _tempNs : makeBucketNsIfTimeseries(getOutputNs()); - pExpCtx->mongoProcessInterface->dropCollection(cleanupOpCtx.get(), deleteNs); + pExpCtx->mongoProcessInterface->dropCollection(cleanupOpCtx.get(), _tempNs); }); } -DocumentSourceOutSpec DocumentSourceOut::parseOutSpecAndResolveTargetNamespace( - const BSONElement& spec, const DatabaseName& defaultDB) { - DocumentSourceOutSpec outSpec; +NamespaceString DocumentSourceOut::parseNsFromElem(const BSONElement& spec, + const DatabaseName& defaultDB) { if (spec.type() == BSONType::String) { - outSpec.setColl(spec.valueStringData()); - outSpec.setDb(defaultDB.db()); + return NamespaceString(defaultDB, spec.valueStringData()); } else if (spec.type() == BSONType::Object) { - outSpec = mongo::DocumentSourceOutSpec::parse(IDLParserContext(kStageName), - spec.embeddedObject()); + auto nsObj = spec.Obj(); + uassert(16994, + str::stream() << "If an object is passed to " << kStageName + << " it must have exactly 2 fields: 'db' and 'coll'", + nsObj.nFields() == 2 && nsObj.hasField("coll") && nsObj.hasField("db")); + return NamespaceString(defaultDB.tenantId(), nsObj["db"].String(), nsObj["coll"].String()); } else { uassert(16990, "{} only supports a string or object argument, but found {}"_format( kStageName, typeName(spec.type())), spec.type() == BSONType::String); } - - return outSpec; -} - -NamespaceString DocumentSourceOut::makeBucketNsIfTimeseries(const NamespaceString& ns) { - return _timeseries ? ns.makeTimeseriesBucketsNamespace() : ns; + MONGO_UNREACHABLE; } std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parse( const NamespaceString& nss, const BSONElement& spec) { - auto outSpec = parseOutSpecAndResolveTargetNamespace(spec, nss.dbName()); - NamespaceString targetNss = - NamespaceString(nss.dbName().tenantId(), outSpec.getDb(), outSpec.getColl()); + NamespaceString targetNss = parseNsFromElem(spec, nss.dbName()); uassert(ErrorCodes::InvalidNamespace, "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()), targetNss.isValid()); return std::make_unique<DocumentSourceOut::LiteParsed>(spec.fieldName(), std::move(targetNss)); } -void DocumentSourceOut::validateTimeseries() { - const NamespaceString& outNs = getOutputNs(); - if (!_timeseries) { - return; - } - // check if a time-series collection already exists in that namespace. - auto timeseriesOpts = mongo::timeseries::getTimeseriesOptions(pExpCtx->opCtx, outNs, true); - if (timeseriesOpts) { - uassert(7268701, - "Time field inputted does not match the time field of the existing time-series " - "collection.", - _timeseries->getTimeField() == timeseriesOpts->getTimeField()); - uassert(7268702, - "Meta field inputted does not match the time field of the existing time-series " - "collection.", - !_timeseries->getMetaField() || - _timeseries->getMetaField() == timeseriesOpts->getMetaField()); - } else { - // if a time-series collection doesn't exist, the namespace should not have a - // collection nor a conflicting view. - // Hold reference to the catalog for collection lookup without locks to be safe. - auto catalog = CollectionCatalog::get(pExpCtx->opCtx); - auto collection = catalog->lookupCollectionByNamespace(pExpCtx->opCtx, outNs); - uassert(7268700, - "Cannot create a time-series collection from a non time-series collection.", - !collection); - auto view = catalog->lookupView(pExpCtx->opCtx, outNs); - uassert( - 7268703, "Cannot create a time-series collection from a non time-series view.", !view); - } -} - void DocumentSourceOut::initialize() { DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); - const NamespaceString& outputNs = makeBucketNsIfTimeseries(getOutputNs()); - - // We will write all results into a temporary collection, then rename the temporary - // collection to be the target collection once we are done. Note that this temporary - // collection name is used by MongoMirror and thus should not be changed without - // consultation. - _tempNs = NamespaceString(getOutputNs().tenantId(), - str::stream() << getOutputNs().dbName().toString() << ".tmp.agg_out." + const auto& outputNs = getOutputNs(); + // We will write all results into a temporary collection, then rename the temporary collection + // to be the target collection once we are done. + // Note that this temporary collection name is used by MongoMirror and thus should not be + // changed without consultation. + _tempNs = NamespaceString(outputNs.tenantId(), + str::stream() << outputNs.dbName().toString() << ".tmp.agg_out." << UUID::gen()); - validateTimeseries(); // Save the original collection options and index specs so we can check they didn't change // during computation. _originalOutOptions = @@ -192,19 +145,10 @@ void DocumentSourceOut::initialize() { cmd << "create" << _tempNs.coll(); cmd << "temp" << true; cmd.appendElementsUnique(_originalOutOptions); - if (_timeseries) { - cmd << DocumentSourceOutSpec::kTimeseriesFieldName << _timeseries->toBSON(); - pExpCtx->mongoProcessInterface->createTimeseries( - pExpCtx->opCtx, _tempNs, cmd.done(), false); - } else { - pExpCtx->mongoProcessInterface->createCollection( - pExpCtx->opCtx, _tempNs.dbName(), cmd.done()); - } - } - // After creating the tmp collection we should update '_tempNs' to represent the buckets - // collection if the collection is time-series. - _tempNs = makeBucketNsIfTimeseries(_tempNs); + pExpCtx->mongoProcessInterface->createCollection( + pExpCtx->opCtx, _tempNs.dbName(), cmd.done()); + } CurOpFailpointHelpers::waitWhileFailPointEnabled( &outWaitAfterTempCollectionCreation, @@ -233,38 +177,22 @@ void DocumentSourceOut::initialize() { void DocumentSourceOut::finalize() { DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); - // If the collection is timeseries, must rename to the "real" buckets collection - const NamespaceString& outputNs = makeBucketNsIfTimeseries(getOutputNs()); - - pExpCtx->mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged( - pExpCtx->opCtx, - _tempNs, - outputNs, - true /* dropTarget */, - false /* stayTemp */, - _timeseries ? true : false /* allowBuckets */, - _originalOutOptions, - _originalIndexes); + const auto& outputNs = getOutputNs(); + pExpCtx->mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged(pExpCtx->opCtx, + _tempNs, + outputNs, + true /* dropTarget */, + false /* stayTemp */, + _originalOutOptions, + _originalIndexes); // The rename succeeded, so the temp collection no longer exists. _tempNs = {}; - - // If the collection is timeseries, try to create the view. - if (_timeseries) { - BSONObjBuilder cmd; - cmd << DocumentSourceOutSpec::kTimeseriesFieldName << _timeseries->toBSON(); - pExpCtx->mongoProcessInterface->createTimeseries( - pExpCtx->opCtx, getOutputNs(), cmd.done(), true); - } - - // Creating the view succeeded, so the boolean should be set to true. - _timeseriesViewCreated = true; } boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create( - NamespaceString outputNs, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - boost::optional<TimeseriesOptions> timeseries) { + NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert(ErrorCodes::OperationNotSupportedInTransaction, "{} cannot be used in a transaction"_format(kStageName), !expCtx->opCtx->inMultiDocumentTransaction()); @@ -280,15 +208,14 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create( uassert(31321, "Can't {} to internal database: {}"_format(kStageName, outputNs.db()), !outputNs.isOnInternalDb()); - return new DocumentSourceOut(std::move(outputNs), std::move(timeseries), expCtx); + + return new DocumentSourceOut(std::move(outputNs), expCtx); } boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - auto outSpec = parseOutSpecAndResolveTargetNamespace(elem, expCtx->ns.dbName()); - NamespaceString targetNss = - NamespaceString(expCtx->ns.dbName().tenantId(), outSpec.getDb(), outSpec.getColl()); - return create(std::move(targetNss), expCtx, std::move(outSpec.getTimeseries())); + auto targetNS = parseNsFromElem(elem, expCtx->ns.dbName()); + return create(targetNS, expCtx); } Value DocumentSourceOut::serialize(SerializationOptions opts) const { @@ -297,11 +224,8 @@ Value DocumentSourceOut::serialize(SerializationOptions opts) const { } // Do not include the tenantId in the serialized 'outputNs'. - return Value(Document{{getSourceName(), - Document{{"db", _outputNs.dbName().db()}, - {"coll", _outputNs.coll()}, - {DocumentSourceOutSpec::kTimeseriesFieldName, - _timeseries ? Value(_timeseries->toBSON()) : Value()}}}}); + return Value( + DOC(kStageName << DOC("db" << _outputNs.dbName().db() << "coll" << _outputNs.coll()))); } void DocumentSourceOut::waitWhileFailPointEnabled() { @@ -310,9 +234,8 @@ void DocumentSourceOut::waitWhileFailPointEnabled() { pExpCtx->opCtx, "hangWhileBuildingDocumentSourceOutBatch", []() { - LOGV2( - 20902, - "Hanging aggregation due to 'hangWhileBuildingDocumentSourceOutBatch' failpoint"); + LOGV2(20902, + "Hanging aggregation due to 'hangWhileBuildingDocumentSourceOutBatch' failpoint"); }); } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index f8257d7acce..94192fb18d4 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -29,7 +29,6 @@ #pragma once -#include "mongo/db/pipeline/document_source_out_gen.h" #include "mongo/db/pipeline/document_source_writer.h" namespace mongo { @@ -98,9 +97,7 @@ public: * Creates a new $out stage from the given arguments. */ static boost::intrusive_ptr<DocumentSource> create( - NamespaceString outputNs, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - boost::optional<TimeseriesOptions> timeseries = boost::none); + NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** * Parses a $out stage from the user-supplied BSON. @@ -116,13 +113,11 @@ public: private: DocumentSourceOut(NamespaceString outputNs, - boost::optional<TimeseriesOptions> timeseries, const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceWriter(kStageName.rawData(), std::move(outputNs), expCtx), - _timeseries(std::move(timeseries)) {} + : DocumentSourceWriter(kStageName.rawData(), std::move(outputNs), expCtx) {} + + static NamespaceString parseNsFromElem(const BSONElement& spec, const DatabaseName& defaultDB); - static DocumentSourceOutSpec parseOutSpecAndResolveTargetNamespace( - const BSONElement& spec, const DatabaseName& defaultDB); void initialize() override; void finalize() override; @@ -131,13 +126,8 @@ private: DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); auto targetEpoch = boost::none; - if (_timeseries) { - uassertStatusOK(pExpCtx->mongoProcessInterface->insertTimeseries( - pExpCtx, _tempNs, std::move(batch), _writeConcern, targetEpoch)); - } else { - uassertStatusOK(pExpCtx->mongoProcessInterface->insert( - pExpCtx, _tempNs, std::move(batch), _writeConcern, targetEpoch)); - } + uassertStatusOK(pExpCtx->mongoProcessInterface->insert( + pExpCtx, _tempNs, std::move(batch), _writeConcern, targetEpoch)); } std::pair<BSONObj, int> makeBatchObject(Document&& doc) const override { @@ -148,14 +138,6 @@ private: void waitWhileFailPointEnabled() override; - /** - * Checks that the time-series spec passed by the user matches the existing time-series - * collection, if one exists. It will set '_timeseriesExists' to true if a time-series - * collection exists. - */ - void validateTimeseries(); - - NamespaceString makeBucketNsIfTimeseries(const NamespaceString& ns); // Holds on to the original collection options and index specs so we can check they didn't // change during computation. BSONObj _originalOutOptions; @@ -163,10 +145,6 @@ private: // The temporary namespace for the $out writes. NamespaceString _tempNs; - - boost::optional<TimeseriesOptions> _timeseries; - - bool _timeseriesViewCreated = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out.idl b/src/mongo/db/pipeline/document_source_out.idl deleted file mode 100644 index f5d6bcaad1b..00000000000 --- a/src/mongo/db/pipeline/document_source_out.idl +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (C) 2023-present MongoDB, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the Server Side Public License, version 1, -# as published by MongoDB, Inc. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Server Side Public License for more details. -# -# You should have received a copy of the Server Side Public License -# along with this program. If not, see -# <http://www.mongodb.com/licensing/server-side-public-license>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. -# - -# Document source out stage IDL file - -global: - cpp_namespace: "mongo" - -imports: - - "mongo/db/basic_types.idl" - - "mongo/db/timeseries/timeseries.idl" - -structs: - - DocumentSourceOutSpec: - description: "$out pipeline spec" - strict: true - fields: - coll: - description: "Target collection name to write documents from $out to." - type: string - optional: false - db: - description: "Target database name to write documents from $out to." - type: string - optional: false - timeseries: - cpp_name: timeseries - description: "If set, the aggregation stage will use these options to create or - replace a time-series collection in the given namespace." - type: TimeseriesOptions - optional: true - diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp index 2d82f31896f..d863cf5e032 100644 --- a/src/mongo/db/pipeline/document_source_out_test.cpp +++ b/src/mongo/db/pipeline/document_source_out_test.cpp @@ -92,7 +92,7 @@ TEST_F(DocumentSourceOutTest, FailsToParseIncorrectType) { ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990); spec = BSON("$out" << BSONObj()); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 40414); + ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16994); } TEST_F(DocumentSourceOutTest, AcceptsStringArgument) { diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index f08b2f108fa..5b61f3a279c 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -178,11 +178,6 @@ public: const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) = 0; - virtual Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) = 0; /** * Updates the documents matching 'queries' with the objects 'updates'. Returns an error Status * if any of the updates fail, otherwise returns an 'UpdateResult' objects with the details of @@ -279,7 +274,6 @@ public: const NamespaceString& targetNs, bool dropTarget, bool stayTemp, - bool allowBuckets, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) = 0; @@ -291,12 +285,6 @@ public: const DatabaseName& dbName, const BSONObj& cmdObj) = 0; - virtual void createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - bool createView) = 0; - - /** * Runs createIndexes on the given database for the given index specs. If running on a shardsvr * this targets the primary shard of the database part of 'ns'. diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index f9d82eed06c..205d299e6eb 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -78,14 +78,6 @@ public: MONGO_UNREACHABLE; } - Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) final { - MONGO_UNREACHABLE; - } - StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, BatchedObjects&& batch, @@ -154,7 +146,6 @@ public: const NamespaceString& targetNs, bool dropTarget, bool stayTemp, - bool allowBuckets, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) final { MONGO_UNREACHABLE; @@ -166,14 +157,6 @@ public: MONGO_UNREACHABLE; } - - void createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - bool createView) final { - MONGO_UNREACHABLE; - } - void createIndexesOnEmptyCollection(OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) final { diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index 14c14102756..63a2e7f281a 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -29,12 +29,10 @@ #include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" -#include "mongo/base/error_codes.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog/list_indexes.h" #include "mongo/db/catalog/rename_collection.h" -#include "mongo/db/commands.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/db_raii.h" @@ -42,8 +40,6 @@ #include "mongo/db/pipeline/aggregate_command_gen.h" #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/repl/speculative_majority_read_info.h" -#include "mongo/db/timeseries/catalog_helper.h" -#include "mongo/db/timeseries/timeseries_constants.h" namespace mongo { @@ -127,24 +123,6 @@ Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<Express return Status::OK(); } -Status NonShardServerProcessInterface::insertTimeseries( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { - try { - auto insertReply = write_ops_exec::performTimeseriesWrites( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); - - checkWriteErrors(insertReply.getWriteCommandReplyBase()); - } catch (DBException& ex) { - ex.addContext(str::stream() << "time-series insert failed: " << ns.ns()); - throw; - } - return Status::OK(); -} - StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::update( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, @@ -212,52 +190,16 @@ void NonShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( const NamespaceString& targetNs, bool dropTarget, bool stayTemp, - bool allowBuckets, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) { RenameCollectionOptions options; options.dropTarget = dropTarget; options.stayTemp = stayTemp; - options.allowBuckets = allowBuckets; // skip sharding validation on non sharded servers doLocalRenameIfOptionsAndIndexesHaveNotChanged( opCtx, sourceNs, targetNs, options, originalIndexes, originalCollectionOptions); } -void NonShardServerProcessInterface::createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - bool createView) { - - // try to create the view, but catch the error if the view already exists. - if (createView) { - try { - uassertStatusOK(mongo::createTimeseries(opCtx, ns, options)); - } catch (DBException& ex) { - auto view = CollectionCatalog::get(opCtx)->lookupView(opCtx, ns); - if (ex.code() != ErrorCodes::NamespaceExists || !view || !view->timeseries()) { - throw; - // check the time-series spec matches. - } else { - auto timeseriesOpts = mongo::timeseries::getTimeseriesOptions(opCtx, ns, true); - BSONObj inputOpts = options.getField("timeseries").Obj(); - if (!timeseriesOpts || - timeseriesOpts->getTimeField() != - inputOpts.getField(timeseries::kTimeFieldName).valueStringData() || - (inputOpts.hasField(timeseries::kMetaFieldName) && - timeseriesOpts->getMetaField() != - inputOpts.getField(timeseries::kMetaFieldName).valueStringData())) { - throw; - } - } - } - // creating the buckets collection should always succeed. - } else { - uassertStatusOK( - mongo::createTimeseries(opCtx, ns, options, TimeseriesCreateLevel::kBucketsCollOnly)); - } -} - void NonShardServerProcessInterface::createCollection(OperationContext* opCtx, const DatabaseName& dbName, const BSONObj& cmdObj) { diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index 8ffbb4716c5..53de646093a 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -95,12 +95,6 @@ public: const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) override; - Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) override; - StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, BatchedObjects&& batch, @@ -115,7 +109,6 @@ public: const NamespaceString& targetNs, bool dropTarget, bool stayTemp, - bool allowBuckets, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) override; @@ -123,11 +116,6 @@ public: const DatabaseName& dbName, const BSONObj& cmdObj) override; - void createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - bool createView) override; - void dropCollection(OperationContext* opCtx, const NamespaceString& collection) override; void createIndexesOnEmptyCollection(OperationContext* opCtx, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index 76518f60312..6dc8109420c 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -124,41 +124,12 @@ void ReplicaSetNodeProcessInterface::createIndexesOnEmptyCollection( uassertStatusOK(_executeCommandOnPrimary(opCtx, ns, cmd.obj())); } -void ReplicaSetNodeProcessInterface::createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - bool createView) { - if (_canWriteLocally(opCtx, ns)) { - return NonShardServerProcessInterface::createTimeseries(opCtx, ns, options, createView); - } else { - // TODO SERVER-74061 remove uassert. - uasserted(7268706, "$out for time-series collections is not supported on secondaries."); - } -} - -Status ReplicaSetNodeProcessInterface::insertTimeseries( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { - - if (_canWriteLocally(expCtx->opCtx, ns)) { - return NonShardServerProcessInterface::insertTimeseries( - expCtx, ns, std::move(objs), wc, targetEpoch); - } else { - // TODO SERVER-74061 remove uassert. - uasserted(7268707, "$out for time-series collections is not supported on secondaries."); - } -} - void ReplicaSetNodeProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( OperationContext* opCtx, const NamespaceString& sourceNs, const NamespaceString& targetNs, bool dropTarget, bool stayTemp, - bool allowBuckets, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) { if (_canWriteLocally(opCtx, targetNs)) { @@ -168,7 +139,6 @@ void ReplicaSetNodeProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( targetNs, dropTarget, stayTemp, - allowBuckets, originalCollectionOptions, originalIndexes); } diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 69dff21f1e2..02c183d43a0 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -84,7 +84,6 @@ public: const NamespaceString& targetNs, bool dropTarget, bool stayTemp, - bool allowBuckets, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes); void createCollection(OperationContext* opCtx, @@ -94,16 +93,6 @@ public: void createIndexesOnEmptyCollection(OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs); - void createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - bool createView); - - Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch); private: /** diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index dd5e734d9fe..2b657788825 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -176,7 +176,6 @@ void ShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( const NamespaceString& targetNs, bool dropTarget, bool stayTemp, - bool allowBuckets, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) { auto cachedDbInfo = diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index 98c9b1b9d69..94535d92229 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -106,7 +106,6 @@ public: const NamespaceString& targetNs, bool dropTarget, bool stayTemp, - bool allowBuckets, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) final; void createCollection(OperationContext* opCtx, @@ -142,25 +141,6 @@ public: const boost::optional<DatabaseVersion>& dbVersion) override; void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) final; - - void createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - bool createView) final { - // TODO SERVER-74061 remove uassert. - uasserted(7268704, - "$out for time-series collections is not supported on sharded clusters."); - } - - Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) final { - // TODO SERVER-74061 remove uassert. - uasserted(7268705, - "$out for time-series collections is not supported on sharded clusters."); - } }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 569886c3810..fb93068d911 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -83,14 +83,6 @@ public: MONGO_UNREACHABLE; } - Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) override { - MONGO_UNREACHABLE; - } - StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, BatchedObjects&& batch, @@ -118,13 +110,6 @@ public: MONGO_UNREACHABLE; } - void createTimeseries(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& options, - bool createView) final { - MONGO_UNREACHABLE; - } - boost::optional<BSONObj> getCatalogEntry(OperationContext* opCtx, const NamespaceString& ns) const override { MONGO_UNREACHABLE; @@ -167,7 +152,6 @@ public: const NamespaceString& targetNs, bool dropTarget, bool stayTemp, - bool allowBuckets, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) override { MONGO_UNREACHABLE; |