diff options
author | Arun Banala <arun.banala@mongodb.com> | 2021-06-08 10:40:21 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-29 19:09:31 +0000 |
commit | 01fc797a67eeb398f90af39f22e55fb1a243298b (patch) | |
tree | 8fdb200e63aa42eb0e332b1f3876ef7bf282e14f | |
parent | de4fc40201d3a14a0725c9b280ded22f838a0479 (diff) | |
download | mongo-01fc797a67eeb398f90af39f22e55fb1a243298b.tar.gz |
SERVER-57315 Enable shardCollection command for a time series collection
25 files changed, 307 insertions, 181 deletions
diff --git a/jstests/core/timeseries/libs/timeseries.js b/jstests/core/timeseries/libs/timeseries.js index c07536c6a9a..e8fec9adfb9 100644 --- a/jstests/core/timeseries/libs/timeseries.js +++ b/jstests/core/timeseries/libs/timeseries.js @@ -10,6 +10,12 @@ var TimeseriesTest = class { .featureFlagTimeseriesCollection.value; } + static shardedtimeseriesCollectionsEnabled(conn) { + return assert + .commandWorked(conn.adminCommand({getParameter: 1, featureFlagShardedTimeSeries: 1})) + .featureFlagShardedTimeSeries.value; + } + /** * Adjusts the values in 'fields' by a random amount. * Ensures that the new values stay in the range [0, 100]. diff --git a/jstests/sharding/timeseries_shard_collection.js b/jstests/sharding/timeseries_shard_collection.js new file mode 100644 index 00000000000..c34f436450f --- /dev/null +++ b/jstests/sharding/timeseries_shard_collection.js @@ -0,0 +1,176 @@ +/** + * Tests that time-series collections can be sharded with different configurations. + * + * @tags: [ + * requires_fcv_49, + * ] + */ + +(function() { +load("jstests/core/timeseries/libs/timeseries.js"); + +Random.setRandomSeed(); + +const st = new ShardingTest({shards: 2, rs: {nodes: 2}}); + +const dbName = 'test'; +const sDB = st.s.getDB(dbName); + +if (!TimeseriesTest.timeseriesCollectionsEnabled(st.shard0)) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + st.stop(); + return; +} + +if (TimeseriesTest.shardedtimeseriesCollectionsEnabled(st.shard0)) { + function validateBucketsCollectionSharded({collName, shardKey, timeSeriesParams}) { + const configColls = st.s.getDB('config').collections; + const output = configColls + .find({ + _id: 'test.system.buckets.' + collName, + key: shardKey, + timeseriesFields: {$exists: true} + }) + .toArray(); + assert.eq(output.length, 1, configColls.find().toArray()); + assert.eq(output[0].timeseriesFields.timeField, timeSeriesParams.timeField, output[0]); + assert.eq(output[0].timeseriesFields.metaField, timeSeriesParams.metaField, output[0]); + } + + // Simple shard key on the metadata field. + (function metaShardKey() { + assert.commandWorked( + sDB.createCollection('ts', {timeseries: {timeField: 'time', metaField: 'hostId'}})); + + assert.commandWorked(st.s.adminCommand({enableSharding: 'test'})); + + // This index gets created as {meta: 1} on the buckets collection. + assert.commandWorked(sDB.ts.createIndex({hostId: 1})); + + // Command should fail since the 'timeseries' specification does not match that existing + // collection. + assert.commandFailedWithCode( + st.s.adminCommand( + {shardCollection: 'test.ts', key: {'hostId': 1}, timeseries: {timeField: 'time'}}), + 5731500); + + assert.commandWorked(st.s.adminCommand({ + shardCollection: 'test.ts', + key: {'hostId': 1}, + timeseries: {timeField: 'time', metaField: 'hostId'} + })); + + validateBucketsCollectionSharded({ + collName: 'ts', + shardKey: {meta: 1}, + timeSeriesParams: {timeField: 'time', metaField: 'hostId'} + }); + + assert.commandWorked( + st.s.adminCommand({split: 'test.system.buckets.ts', middle: {meta: 10}})); + + const primaryShard = st.getPrimaryShard(dbName); + assert.commandWorked(st.s.adminCommand({ + movechunk: 'test.system.buckets.ts', + find: {meta: 10}, + to: st.getOther(primaryShard).shardName, + _waitForDelete: true + })); + + let counts = st.chunkCounts('system.buckets.ts', 'test'); + assert.eq(1, counts[st.shard0.shardName]); + assert.eq(1, counts[st.shard1.shardName]); + + sDB.dropDatabase(); + })(); + + // Shard key on the metadata field and time fields. + function metaAndTimeShardKey() { + assert.commandWorked(st.s.adminCommand({enableSharding: 'test'})); + assert.commandWorked(st.s.adminCommand({ + shardCollection: 'test.ts', + key: {'hostId': 1, 'time': 1}, + timeseries: {timeField: 'time', metaField: 'hostId'}, + })); + + validateBucketsCollectionSharded({ + collName: 'ts', + // The 'time' field should be translated to 'control.min.time' on buckets collection. + shardKey: {meta: 1, 'control.min.time': 1}, + timeSeriesParams: {timeField: 'time', metaField: 'hostId'} + }); + + assert.commandWorked(st.s.adminCommand( + {split: 'test.system.buckets.ts', middle: {meta: 10, 'control.min.time': MinKey}})); + + const primaryShard = st.getPrimaryShard(dbName); + assert.commandWorked(st.s.adminCommand({ + movechunk: 'test.system.buckets.ts', + find: {meta: 10, 'control.min.time': MinKey}, + to: st.getOther(primaryShard).shardName, + _waitForDelete: true + })); + + let counts = st.chunkCounts('system.buckets.ts', 'test'); + assert.eq(1, counts[st.shard0.shardName]); + assert.eq(1, counts[st.shard1.shardName]); + + sDB.dropDatabase(); + } + + // Sharding a collection with an existing timeseries collection. + assert.commandWorked( + sDB.createCollection('ts', {timeseries: {timeField: 'time', metaField: 'hostId'}})); + metaAndTimeShardKey(); + + // Sharding a collection with a new timeseries collection. + metaAndTimeShardKey(); + +} else { + (function timeseriesCollectionsCannotBeSharded() { + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + + assert.commandFailedWithCode(st.s.adminCommand({ + shardCollection: 'test.ts', + key: {meta: 1}, + timeseries: {timeField: 'time', metaField: 'hostId'} + }), + 5731502); + + assert.commandWorked( + sDB.createCollection('ts', {timeseries: {timeField: 'time', metaField: 'hostId'}})); + + assert.commandFailedWithCode( + st.s.adminCommand({shardCollection: 'test.ts', key: {meta: 1}}), 5731502); + + // Insert directly on the primary shard because mongos does not know how to insert into a TS + // collection. + st.ensurePrimaryShard(dbName, st.shard0.shardName); + const tsColl = st.shard0.getDB(dbName).ts; + const numDocs = 20; + let docs = []; + for (let i = 0; i < numDocs; i++) { + const doc = { + time: ISODate(), + hostId: i, + _id: i, + data: Random.rand(), + }; + docs.push(doc); + assert.commandWorked(tsColl.insert(doc)); + } + + // This index gets created as {meta: 1} on the buckets collection. + assert.commandWorked(tsColl.createIndex({hostId: 1})); + + // Trying to shard the buckets collection -> error + assert.commandFailedWithCode( + st.s.adminCommand({shardCollection: 'test.system.buckets.ts', key: {meta: 1}}), + 5731501); + + assert.commandWorked(sDB.dropDatabase()); + })(); +} + +st.stop(); +})(); diff --git a/jstests/sharding/timeseries_sharded_query.js b/jstests/sharding/timeseries_sharded_query.js deleted file mode 100644 index 23585bbb274..00000000000 --- a/jstests/sharding/timeseries_sharded_query.js +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Tests that time-series collections can be sharded and that queries return correct results. - * - * @tags: [ - * requires_fcv_49, - * requires_find_command, - * ] - */ - -(function() { -load("jstests/core/timeseries/libs/timeseries.js"); -load("jstests/sharding/libs/find_chunks_util.js"); - -Random.setRandomSeed(); - -// We have to disable this check because a test manually crafts a sharded timeseries -// collection, which are frozen in 5.0 so we cannot verify anything from it. -TestData.skipCheckingIndexesConsistentAcrossCluster = true; - -const st = new ShardingTest({shards: 2}); - -const dbName = 'test'; -const sDB = st.s.getDB(dbName); - -if (!TimeseriesTest.timeseriesCollectionsEnabled(st.shard0)) { - jsTestLog("Skipping test because the time-series collection feature flag is disabled"); - st.stop(); - return; -} - -(function timeseriesCollectionsCannotBeSharded() { - assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); - - assert.commandWorked( - sDB.createCollection('ts', {timeseries: {timeField: 'time', metaField: 'hostId'}})); - - // Insert directly on the primary shard because mongos does not know how to insert into a TS - // collection. - st.ensurePrimaryShard(dbName, st.shard0.shardName); - const tsColl = st.shard0.getDB(dbName).ts; - const numDocs = 20; - let docs = []; - for (let i = 0; i < numDocs; i++) { - const doc = { - time: ISODate(), - hostId: i, - _id: i, - data: Random.rand(), - }; - docs.push(doc); - assert.commandWorked(tsColl.insert(doc)); - } - - // This index gets created as {meta: 1} on the buckets collection. - assert.commandWorked(tsColl.createIndex({hostId: 1})); - - // Trying to shard a time-series collection -> error - assert.commandFailed(st.s.adminCommand({shardCollection: 'test.ts', key: {hostId: 1}})); - - // Trying to shard the buckets collection -> error - assert.commandFailed( - st.s.adminCommand({shardCollection: 'test.system.buckets.ts', key: {meta: 1}})); - - assert.commandWorked(sDB.dropDatabase()); -})(); - -(function manuallyCraftedShardedTimeseriesCollectionCannotBeUsed() { - assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); - - assert.commandWorked(sDB.createCollection('coll')); - - st.ensurePrimaryShard(dbName, st.shard0.shardName); - for (let i = 0; i < 20; i++) { - assert.commandWorked(st.shard0.getDB(dbName).coll.insert({a: i})); - } - - assert.commandWorked(st.shard0.getDB(dbName).coll.createIndex({a: 1})); - assert.commandWorked(st.s.adminCommand({shardCollection: 'test.coll', key: {a: 1}})); - - // It modifies the time-series metadata on the CS and on the shards - let modifyTimeseriesMetadata = (opTimeseriesMetadata) => { - st.s.getDB('config').collections.update({_id: 'test.coll'}, opTimeseriesMetadata); - st.shard0.getDB('config').cache.collections.update({_id: 'test.coll'}, - opTimeseriesMetadata); - st.shard1.getDB('config').cache.collections.update({_id: 'test.coll'}, - opTimeseriesMetadata); - }; - - // It forces a bump of the collection version moving the {a: 0} chunk to destShardName - let bumpCollectionVersionThroughMoveChunk = (destShardName) => { - assert.commandWorked( - st.s.adminCommand({moveChunk: 'test.coll', find: {a: 0}, to: destShardName})); - }; - - // It forces a refresh of the routing info on the shards - let forceRefreshOnShards = () => { - assert.commandWorked(st.shard0.adminCommand( - {_flushRoutingTableCacheUpdates: 'test.coll', syncFromConfig: true})); - assert.commandWorked(st.shard1.adminCommand( - {_flushRoutingTableCacheUpdates: 'test.coll', syncFromConfig: true})); - }; - - // Hacky code to simulate that 'test.coll' is a sharded time-series collection - modifyTimeseriesMetadata({$set: {timeseriesFields: {timeField: "a"}}}); - bumpCollectionVersionThroughMoveChunk(st.shard1.shardName); - forceRefreshOnShards(); - - let check = (cmdRes) => { - assert.commandFailedWithCode(cmdRes, ErrorCodes.NotImplemented); - }; - - // CRUD ops & drop collection - check(st.s.getDB(dbName).runCommand({find: 'coll', filter: {a: 1}})); - check(st.s.getDB(dbName).runCommand({find: 'coll', filter: {}})); - check(st.s.getDB(dbName).runCommand({insert: 'coll', documents: [{a: 21}]})); - check(st.s.getDB(dbName).runCommand({insert: 'coll', documents: [{a: 21}, {a: 22}]})); - check(st.s.getDB(dbName).runCommand( - {update: 'coll', updates: [{q: {a: 1}, u: {$set: {b: 10}}}]})); - check(st.s.getDB(dbName).runCommand({update: 'coll', updates: [{q: {}, u: {$set: {b: 10}}}]})); - check(st.s.getDB(dbName).runCommand({delete: 'coll', deletes: [{q: {a: 1}, limit: 1}]})); - check(st.s.getDB(dbName).runCommand({delete: 'coll', deletes: [{q: {}, limit: 0}]})); - check(st.s.getDB(dbName).runCommand({drop: 'coll'}), ErrorCodes.IllegalOperation); -})(); - -st.stop(); -})(); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 58aa50d7ffd..4e435492cb5 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -389,6 +389,8 @@ env.Library( '$BUILD_DIR/mongo/db/repl/primary_only_service', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/repl/replica_set_messages', + '$BUILD_DIR/mongo/db/timeseries/timeseries_idl', + '$BUILD_DIR/mongo/db/timeseries/timeseries_index_schema_conversion_functions', '$BUILD_DIR/mongo/db/timeseries/timeseries_options', '$BUILD_DIR/mongo/s/commands/shared_cluster_commands', '$BUILD_DIR/mongo/s/sharding_initialization', diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp index 0df1e8a4338..d987970efa8 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp @@ -246,7 +246,10 @@ TEST_F(BalancerChunkSelectionTest, ShardedTimeseriesCollectionsCannotBeAutoSplit const auto collUUID = UUID::gen(); ChunkVersion version(2, 0, OID::gen(), Timestamp(42)); setUpDatabase(kDbName, kShardId0); - setUpCollection(kNamespace, collUUID, version, TypeCollectionTimeseriesFields("fieldName")); + + TypeCollectionTimeseriesFields tsFields; + tsFields.setTimeseriesOptions(TimeseriesOptions("fieldName")); + setUpCollection(kNamespace, collUUID, version, std::move(tsFields)); // Set up two zones setUpTags(kNamespace, @@ -290,7 +293,10 @@ TEST_F(BalancerChunkSelectionTest, ShardedTimeseriesCollectionsCannotBeBalanced) const auto collUUID = UUID::gen(); ChunkVersion version(2, 0, OID::gen(), Timestamp(42)); setUpDatabase(kDbName, kShardId0); - setUpCollection(kNamespace, collUUID, version, TypeCollectionTimeseriesFields("fieldName")); + + TypeCollectionTimeseriesFields tsFields; + tsFields.setTimeseriesOptions(TimeseriesOptions("fieldName")); + setUpCollection(kNamespace, collUUID, version, std::move(tsFields)); auto addChunk = [&](const BSONObj& min, const BSONObj& max) { setUpChunk(kNamespace, collUUID, min, max, kShardId0, version); diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp index 1db44836b11..0b3a9585c27 100644 --- a/src/mongo/db/s/collection_metadata_filtering_test.cpp +++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp @@ -213,32 +213,5 @@ TEST_F(CollectionMetadataFilteringTest, FilterDocumentsTooFarInThePastThrowsStal operationContext(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup)); } -TEST_F(CollectionMetadataFilteringTest, DisallowOpsOnShardedTimeseriesCollection) { - TypeCollectionTimeseriesFields timeseriesFields("fieldName"); - prepareTestData(timeseriesFields); - - BSONObj readConcern = BSON("readConcern" << BSON("level" - << "snapshot" - << "atClusterTime" << Timestamp(100, 0))); - - auto&& readConcernArgs = repl::ReadConcernArgs::get(operationContext()); - ASSERT_OK(readConcernArgs.initialize(readConcern["readConcern"])); - - AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); - auto* const css = CollectionShardingState::get(operationContext(), kNss); - - auto check = [&](const DBException& ex) { - ASSERT_EQ(ex.code(), ErrorCodes::NotImplemented) << ex.toString(); - ASSERT_STRING_CONTAINS(ex.reason(), - "Operations on sharded time-series collections are not supported"); - }; - - ASSERT_THROWS_WITH_CHECK( - css->getOwnershipFilter(operationContext(), - CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup), - AssertionException, - check); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index d288cb7fcdc..c08134ab030 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -333,10 +333,6 @@ CollectionShardingRuntime::_getMetadataWithVersionCheckAt( const auto& currentMetadata = optCurrentMetadata->get(); - uassert(ErrorCodes::NotImplemented, - "Operations on sharded time-series collections are not supported", - !currentMetadata.isSharded() || !currentMetadata.getTimeseriesFields()); - auto wantedShardVersion = currentMetadata.getShardVersion(); { diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index a5d2253ae6e..3b2b83862bb 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -73,11 +73,14 @@ OptionsAndIndexes getCollectionOptionsAndIndexes(OperationContext* opCtx, // There must be a collection at this time. invariant(!all.empty()); auto& entry = all.front(); + if (entry["options"].isABSONObj()) { optionsBob.appendElements(entry["options"].Obj()); } optionsBob.append(entry["info"]["uuid"]); - idIndex = entry["idIndex"].Obj().getOwned(); + if (entry["idIndex"]) { + idIndex = entry["idIndex"].Obj().getOwned(); + } auto indexSpecsList = localClient.getIndexSpecs(nssOrUUID, false, 0); @@ -539,7 +542,7 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss().isSystem() || nss() == NamespaceString::kLogicalSessionsNamespace || - nss().isTemporaryReshardingCollection()); + nss().isTemporaryReshardingCollection() || nss().isTimeseriesBucketsCollection()); if (_doc.getNumInitialChunks()) { // Ensure numInitialChunks is within valid bounds. @@ -714,6 +717,12 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { coll.setKeyPattern(_shardKeyPattern->getKeyPattern()); + if (_doc.getCreateCollectionRequest().getTimeseries()) { + TypeCollectionTimeseriesFields timeseriesFields; + timeseriesFields.setTimeseriesOptions(*_doc.getCreateCollectionRequest().getTimeseries()); + coll.setTimeseriesFields(std::move(timeseriesFields)); + } + if (_collation) { coll.setDefaultCollation(_collation.value()); } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 911f6137bf0..f5d9c9862e2 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -920,7 +920,9 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC uint64_t averageObjectIdSize = 0; const uint64_t defaultObjectIdSize = OID::kOIDSize; - if (totalRecs > 0) { + + // For a time series collection, an index on '_id' is not required. + if (totalRecs > 0 && !collection->getTimeseriesOptions()) { const auto idIdx = collection->getIndexCatalog()->findIdIndex(opCtx)->getEntry(); if (!idIdx) { return {ErrorCodes::IndexNotFound, diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp index a488c7eac41..edf5e1a435f 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp @@ -621,7 +621,9 @@ TEST_F(ShardServerCatalogCacheLoaderTest, TimeseriesFieldsAreProperlyPropagatedO ChunkVersion collectionVersion(1, 0, OID::gen(), boost::none /* timestamp */); CollectionType collectionType = makeCollectionType(collectionVersion); - collectionType.setTimeseriesFields(TypeCollectionTimeseriesFields("fieldName")); + TypeCollectionTimeseriesFields tsFields; + tsFields.setTimeseriesOptions(TimeseriesOptions("fieldName")); + collectionType.setTimeseriesFields(tsFields); vector<ChunkType> chunks = makeFiveChunks(collectionVersion); diff --git a/src/mongo/db/s/shardsvr_create_collection_command.cpp b/src/mongo/db/s/shardsvr_create_collection_command.cpp index 1e5bec2cad1..add45172998 100644 --- a/src/mongo/db/s/shardsvr_create_collection_command.cpp +++ b/src/mongo/db/s/shardsvr_create_collection_command.cpp @@ -33,14 +33,17 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/commands.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/s/create_collection_coordinator.h" #include "mongo/db/s/shard_collection_legacy.h" #include "mongo/db/s/sharding_ddl_50_upgrade_downgrade.h" #include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" #include "mongo/db/timeseries/timeseries_options.h" #include "mongo/logv2/log.h" #include "mongo/s/grid.h" @@ -129,7 +132,7 @@ CreateCollectionResponse createCollectionLegacy(OperationContext* opCtx, uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss.isSystem() || nss == NamespaceString::kLogicalSessionsNamespace || - nss.isTemporaryReshardingCollection()); + nss.isTemporaryReshardingCollection() || nss.isTimeseriesBucketsCollection()); auto optNumInitialChunks = request.getNumInitialChunks(); if (optNumInitialChunks) { @@ -177,16 +180,42 @@ CreateCollectionResponse createCollectionLegacy(OperationContext* opCtx, } CreateCollectionResponse createCollection(OperationContext* opCtx, - const NamespaceString& nss, + NamespaceString nss, const ShardsvrCreateCollection& request) { uassert( ErrorCodes::NotImplemented, "create collection not implemented yet", request.getShardKey()); + auto bucketsNs = nss.makeTimeseriesBucketsNamespace(); + auto bucketsColl = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, bucketsNs); + CreateCollectionRequest createCmdRequest = request.getCreateCollectionRequest(); + + // If the 'system.buckets' exists or 'timeseries' parameters are passed in, we know that we are + // trying shard a timeseries collection. + if (bucketsColl || createCmdRequest.getTimeseries()) { + uassert(5731502, + "Sharding a timeseries collection feature is not enabled", + feature_flags::gFeatureFlagShardedTimeSeries.isEnabledAndIgnoreFCV()); + + if (!createCmdRequest.getTimeseries()) { + createCmdRequest.setTimeseries(bucketsColl->getTimeseriesOptions()); + } else if (bucketsColl) { + uassert(5731500, + str::stream() << "the 'timeseries' spec provided must match that of exists '" + << nss << "' collection", + timeseries::optionsAreEqual(*createCmdRequest.getTimeseries(), + *bucketsColl->getTimeseriesOptions())); + } + nss = bucketsNs; + createCmdRequest.setShardKey( + uassertStatusOK(timeseries::createBucketsShardKeySpecFromTimeseriesShardKeySpec( + *createCmdRequest.getTimeseries(), *createCmdRequest.getShardKey()))); + } + auto coordinatorDoc = CreateCollectionCoordinatorDocument(); coordinatorDoc.setShardingDDLCoordinatorMetadata( - {{nss, DDLCoordinatorTypeEnum::kCreateCollection}}); - coordinatorDoc.setCreateCollectionRequest(request.getCreateCollectionRequest()); - + {{std::move(nss), DDLCoordinatorTypeEnum::kCreateCollection}}); + coordinatorDoc.setCreateCollectionRequest(std::move(createCmdRequest)); auto service = ShardingDDLCoordinatorService::getService(opCtx); auto createCollectionCoordinator = checked_pointer_cast<CreateCollectionCoordinator>( service->getOrCreateInstance(opCtx, coordinatorDoc.toBSON())); @@ -247,6 +276,7 @@ public: LOGV2_DEBUG( 5277910, 1, "Running new create collection procedure", "namespace"_attr = ns()); + return createCollection(opCtx, ns(), request()); } diff --git a/src/mongo/db/s/type_shard_collection.h b/src/mongo/db/s/type_shard_collection.h index 1c805a31c63..224194e5abd 100644 --- a/src/mongo/db/s/type_shard_collection.h +++ b/src/mongo/db/s/type_shard_collection.h @@ -46,6 +46,7 @@ public: using ShardCollectionTypeBase::kNssFieldName; using ShardCollectionTypeBase::kRefreshingFieldName; using ShardCollectionTypeBase::kReshardingFieldsFieldName; + using ShardCollectionTypeBase::kTimeseriesFieldsFieldName; using ShardCollectionTypeBase::kUniqueFieldName; using ShardCollectionTypeBase::kUuidFieldName; diff --git a/src/mongo/db/timeseries/timeseries_index_schema_conversion_functions.cpp b/src/mongo/db/timeseries/timeseries_index_schema_conversion_functions.cpp index 5909670ff08..dccdbbf3893 100644 --- a/src/mongo/db/timeseries/timeseries_index_schema_conversion_functions.cpp +++ b/src/mongo/db/timeseries/timeseries_index_schema_conversion_functions.cpp @@ -42,8 +42,10 @@ namespace mongo { namespace timeseries { -StatusWith<BSONObj> createBucketsIndexSpecFromTimeseriesIndexSpec( - const TimeseriesOptions& timeseriesOptions, const BSONObj& timeseriesIndexSpecBSON) { +namespace { +StatusWith<BSONObj> createBucketsSpecFromTimeseriesSpec(const TimeseriesOptions& timeseriesOptions, + const BSONObj& timeseriesIndexSpecBSON, + bool isShardKeySpec) { auto timeField = timeseriesOptions.getTimeField(); auto metaField = timeseriesOptions.getMetaField(); @@ -68,8 +70,10 @@ StatusWith<BSONObj> createBucketsIndexSpecFromTimeseriesIndexSpec( if (elem.number() >= 0) { builder.appendAs( elem, str::stream() << timeseries::kControlMinFieldNamePrefix << timeField); - builder.appendAs( - elem, str::stream() << timeseries::kControlMaxFieldNamePrefix << timeField); + if (!isShardKeySpec) { + builder.appendAs( + elem, str::stream() << timeseries::kControlMaxFieldNamePrefix << timeField); + } } else { builder.appendAs( elem, str::stream() << timeseries::kControlMaxFieldNamePrefix << timeField); @@ -112,6 +116,17 @@ StatusWith<BSONObj> createBucketsIndexSpecFromTimeseriesIndexSpec( return builder.obj(); } +} // namespace + +StatusWith<BSONObj> createBucketsIndexSpecFromTimeseriesIndexSpec( + const TimeseriesOptions& timeseriesOptions, const BSONObj& timeseriesIndexSpecBSON) { + return createBucketsSpecFromTimeseriesSpec(timeseriesOptions, timeseriesIndexSpecBSON, false); +} + +StatusWith<BSONObj> createBucketsShardKeySpecFromTimeseriesShardKeySpec( + const TimeseriesOptions& timeseriesOptions, const BSONObj& timeseriesShardKeySpecBSON) { + return createBucketsSpecFromTimeseriesSpec(timeseriesOptions, timeseriesShardKeySpecBSON, true); +} boost::optional<BSONObj> createTimeseriesIndexSpecFromBucketsIndexSpec( const TimeseriesOptions& timeseriesOptions, const BSONObj& bucketsIndexSpecBSON) { diff --git a/src/mongo/db/timeseries/timeseries_index_schema_conversion_functions.h b/src/mongo/db/timeseries/timeseries_index_schema_conversion_functions.h index 4fbb9c6139e..3fa88e09a58 100644 --- a/src/mongo/db/timeseries/timeseries_index_schema_conversion_functions.h +++ b/src/mongo/db/timeseries/timeseries_index_schema_conversion_functions.h @@ -52,6 +52,9 @@ namespace timeseries { StatusWith<BSONObj> createBucketsIndexSpecFromTimeseriesIndexSpec( const TimeseriesOptions& timeseriesOptions, const BSONObj& timeseriesIndexSpecBSON); +StatusWith<BSONObj> createBucketsShardKeySpecFromTimeseriesShardKeySpec( + const TimeseriesOptions& timeseriesOptions, const BSONObj& timeseriesIndexSpecBSON); + /** * Maps the buckets collection index spec 'bucketsIndexSpecBSON' to the index schema of the * time-series collection using the information provided in 'timeseriesOptions'. diff --git a/src/mongo/db/timeseries/timeseries_options.cpp b/src/mongo/db/timeseries/timeseries_options.cpp index 6ba0a5b7480..1f583ea0afd 100644 --- a/src/mongo/db/timeseries/timeseries_options.cpp +++ b/src/mongo/db/timeseries/timeseries_options.cpp @@ -78,6 +78,18 @@ int getBucketRoundingSecondsFromGranularity(BucketGranularityEnum granularity) { } MONGO_UNREACHABLE; } +bool optionsAreEqual(const TimeseriesOptions& option1, const TimeseriesOptions& option2) { + const auto option1BucketSpan = option1.getBucketMaxSpanSeconds() + ? *option1.getBucketMaxSpanSeconds() + : getMaxSpanSecondsFromGranularity(option1.getGranularity()); + const auto option2BucketSpan = option2.getBucketMaxSpanSeconds() + ? *option2.getBucketMaxSpanSeconds() + : getMaxSpanSecondsFromGranularity(option2.getGranularity()); + return option1.getTimeField() == option1.getTimeField() && + option1.getMetaField() == option2.getMetaField() && + option1.getGranularity() == option2.getGranularity() && + option1BucketSpan == option2BucketSpan; +} } // namespace timeseries } // namespace mongo diff --git a/src/mongo/db/timeseries/timeseries_options.h b/src/mongo/db/timeseries/timeseries_options.h index e3a605dde73..fc3548ce9fc 100644 --- a/src/mongo/db/timeseries/timeseries_options.h +++ b/src/mongo/db/timeseries/timeseries_options.h @@ -58,5 +58,6 @@ int getMaxSpanSecondsFromGranularity(BucketGranularityEnum granularity); */ int getBucketRoundingSecondsFromGranularity(BucketGranularityEnum granularity); +bool optionsAreEqual(const TimeseriesOptions& option1, const TimeseriesOptions& option2); } // namespace timeseries } // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 5695f532133..69a3c965c16 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -198,6 +198,7 @@ env.Library( '$BUILD_DIR/mongo/db/query/query_request', '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/db/server_options', + '$BUILD_DIR/mongo/db/timeseries/timeseries_idl', '$BUILD_DIR/mongo/idl/feature_flag', '$BUILD_DIR/mongo/idl/idl_parser', ] diff --git a/src/mongo/s/catalog/type_collection.h b/src/mongo/s/catalog/type_collection.h index d723799d966..a5710d08f76 100644 --- a/src/mongo/s/catalog/type_collection.h +++ b/src/mongo/s/catalog/type_collection.h @@ -86,6 +86,7 @@ public: static constexpr auto kAllowMigrationsFieldName = kPre50CompatibleAllowMigrationsFieldName; using CollectionTypeBase::kNssFieldName; using CollectionTypeBase::kReshardingFieldsFieldName; + using CollectionTypeBase::kTimeseriesFieldsFieldName; using CollectionTypeBase::kTimestampFieldName; using CollectionTypeBase::kUniqueFieldName; using CollectionTypeBase::kUpdatedAtFieldName; diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index f3be35e67a7..863ee2ec8cf 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -35,6 +35,7 @@ #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache_loader.h" #include "mongo/s/chunk_manager.h" +#include "mongo/s/type_collection_timeseries_fields_gen.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/read_through_cache.h" diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h index 528ff5519e3..a4afe4e7277 100644 --- a/src/mongo/s/catalog_cache_loader.h +++ b/src/mongo/s/catalog_cache_loader.h @@ -38,6 +38,7 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/type_collection_timeseries_fields_gen.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/uuid.h" diff --git a/src/mongo/s/catalog_cache_test.cpp b/src/mongo/s/catalog_cache_test.cpp index cbffa3e66f2..f0c809c34d0 100644 --- a/src/mongo/s/catalog_cache_test.cpp +++ b/src/mongo/s/catalog_cache_test.cpp @@ -445,7 +445,9 @@ TEST_F(CatalogCacheTest, TimeseriesFieldsAreProperlyPropagatedOnCC) { loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)}); auto coll = makeCollectionType(version); - coll.setTimeseriesFields(TypeCollectionTimeseriesFields("fieldName")); + TypeCollectionTimeseriesFields tsFields; + tsFields.setTimeseriesOptions(TimeseriesOptions("fieldName")); + coll.setTimeseriesFields(tsFields); const auto scopedCollProv = scopedCollectionProvider(coll); const auto scopedChunksProv = scopedChunksProvider(makeChunks(version)); diff --git a/src/mongo/s/commands/cluster_commands.idl b/src/mongo/s/commands/cluster_commands.idl index 3f95b7f0181..588daf66ee0 100644 --- a/src/mongo/s/commands/cluster_commands.idl +++ b/src/mongo/s/commands/cluster_commands.idl @@ -31,6 +31,7 @@ global: imports: - "mongo/idl/basic_types.idl" + - "mongo/db/timeseries/timeseries.idl" server_parameters: KillSessionsMaxConcurrency: @@ -96,3 +97,7 @@ structs: type: object description: "The collation to use for the shard key index." optional: true + timeseries: + description: "The options to create the time-series collection with." + type: TimeseriesOptions + optional: true diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index c2217153d92..0a73c45f035 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -87,6 +87,11 @@ public: const BSONObj& cmdObj, BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); + + uassert(5731501, + "Sharding a buckets collection is not allowed", + !nss.isTimeseriesBucketsCollection()); + auto shardCollRequest = ShardCollection::parse(IDLParserErrorContext("ShardCollection"), cmdObj); @@ -97,6 +102,7 @@ public: requestParamsObj.setNumInitialChunks(shardCollRequest.getNumInitialChunks()); requestParamsObj.setPresplitHashedZones(shardCollRequest.getPresplitHashedZones()); requestParamsObj.setCollation(shardCollRequest.getCollation()); + requestParamsObj.setTimeseries(shardCollRequest.getTimeseries()); shardsvrCollRequest.setCreateCollectionRequest(std::move(requestParamsObj)); shardsvrCollRequest.setDbName(nss.db()); diff --git a/src/mongo/s/request_types/sharded_ddl_commands.idl b/src/mongo/s/request_types/sharded_ddl_commands.idl index 0a5a07b0884..9d3aa766274 100644 --- a/src/mongo/s/request_types/sharded_ddl_commands.idl +++ b/src/mongo/s/request_types/sharded_ddl_commands.idl @@ -39,6 +39,7 @@ imports: - "mongo/s/chunk_version.idl" - "mongo/s/database_version.idl" - "mongo/s/resharding/common_types.idl" + - "mongo/db/timeseries/timeseries.idl" types: CollectionType: @@ -117,6 +118,10 @@ structs: A specific set of points to create initial splits at, currently used only by mapReduce. optional: true + timeseries: + description: "The options to create the time-series collection with." + type: TimeseriesOptions + optional: true collation: type: object_owned description: "The collation to use for the shard key index." diff --git a/src/mongo/s/type_collection_timeseries_fields.idl b/src/mongo/s/type_collection_timeseries_fields.idl index a9a0da9f392..1fd68bb70de 100644 --- a/src/mongo/s/type_collection_timeseries_fields.idl +++ b/src/mongo/s/type_collection_timeseries_fields.idl @@ -30,16 +30,12 @@ global: imports: - "mongo/idl/basic_types.idl" + - "mongo/db/timeseries/timeseries.idl" structs: TypeCollectionTimeseriesFields: description: "Fields for time-series buckets collection fields in config.collections." strict: false - fields: - timeField: - description: "The name of the top-level field to be used for time." - type: string - metaField: - description: "The name of the top-level field describing the series." - type: string - optional: true + inline_chained_structs: true + chained_structs: + TimeseriesOptions: TimeseriesOptions |