diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2022-10-17 10:16:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-17 10:46:28 +0000 |
commit | 613df409e8e545cefb494c140565bdbe85838ce2 (patch) | |
tree | 00144667efe57fad422de899b68610edbbc4459b | |
parent | 4826dded41f235887cd472845227926e4c86acd5 (diff) | |
download | mongo-613df409e8e545cefb494c140565bdbe85838ce2.tar.gz |
SERVER-68927 Store placement changes into config.placementHistory when a shardCollection() command gets committed on the config server
-rw-r--r-- | jstests/sharding/sessions_collection_auto_healing.js | 24 | ||||
-rw-r--r-- | jstests/sharding/store_historical_placement_data.js | 95 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 121 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.h | 4 |
5 files changed, 192 insertions, 53 deletions
diff --git a/jstests/sharding/sessions_collection_auto_healing.js b/jstests/sharding/sessions_collection_auto_healing.js index 5689c4fd3ec..109b8fe96d0 100644 --- a/jstests/sharding/sessions_collection_auto_healing.js +++ b/jstests/sharding/sessions_collection_auto_healing.js @@ -3,6 +3,7 @@ * ] */ load('jstests/libs/sessions_collection.js'); +load("jstests/libs/feature_flag_util.js"); (function() { "use strict"; @@ -119,19 +120,30 @@ var shardConfig = shard.getDB("config"); validateSessionsCollection(shard, true, true); - // We will have two sessions because of the session used in the shardCollection's retryable - // write to shard the sessions collection. It will disappear after we run the refresh - // function on the shard. - assert.eq(shardConfig.system.sessions.countDocuments({}), 2, "did not flush config's sessions"); + // TODO SERVER-69106 adapt the test assuming that the flag will be always enabled. + const historicalPlacementDataFeatureFlag = FeatureFlagUtil.isEnabled( + st.configRS.getPrimary().getDB('admin'), "HistoricalPlacementShardingCatalog"); + const sessionsOpenedByShardCollectionCmd = historicalPlacementDataFeatureFlag ? 3 : 2; + + // We will have sessionsOpenedByShardCollectionCmd sessions because of the sessions used in the + // shardCollection's retryable write to shard the sessions collection. It will disappear after + // we run the refresh function on the shard. + assert.eq(shardConfig.system.sessions.countDocuments({}), + sessionsOpenedByShardCollectionCmd, + "did not flush config's sessions"); // Now, if we do refreshes on the other servers, their in-mem records will // be written to the collection. assert.commandWorked(shard.adminCommand({refreshLogicalSessionCacheNow: 1})); - assert.eq(shardConfig.system.sessions.countDocuments({}), 3, "did not flush shard's sessions"); + assert.eq(shardConfig.system.sessions.countDocuments({}), + sessionsOpenedByShardCollectionCmd + 1, + "did not flush shard's sessions"); rs.awaitLastOpCommitted(); assert.commandWorked(mongos.adminCommand({refreshLogicalSessionCacheNow: 1})); - assert.eq(shardConfig.system.sessions.countDocuments({}), 5, "did not flush mongos' sessions"); + assert.eq(shardConfig.system.sessions.countDocuments({}), + sessionsOpenedByShardCollectionCmd + 3, + "did not flush mongos' sessions"); } // Test that if we drop the index on the sessions collection, only a refresh on the config diff --git a/jstests/sharding/store_historical_placement_data.js b/jstests/sharding/store_historical_placement_data.js index 844d46cf54e..f4b2831df02 100644 --- a/jstests/sharding/store_historical_placement_data.js +++ b/jstests/sharding/store_historical_placement_data.js @@ -1,14 +1,73 @@ -/* - * The test verifies that each Sharding DDL operation that gets successfully completed - * also produces a document detailing the changes to the placement of the targeted nss. - * - * - */ + (function() { "use strict"; load("jstests/libs/feature_flag_util.js"); const st = new ShardingTest({shards: 2}); +const configDB = st.s.getDB('config'); + +function getAndValidateLatestPlacementInfoForDB(dbName) { + const placementQueryResults = + configDB.placementHistory.find({nss: dbName}).sort({timestamp: -1}).limit(1).toArray(); + assert.eq(placementQueryResults.length, 1); + const dbPlacementDetails = placementQueryResults[0]; + + // Verify that the placementHistory document matches the related content stored in + // config.databases. + const configDBsQueryResults = configDB.databases.find({_id: dbPlacementDetails.nss}).toArray(); + assert.eq(1, configDBsQueryResults.length); + const databaseDetails = configDBsQueryResults[0]; + + assert(timestampCmp(databaseDetails.version.timestamp, dbPlacementDetails.timestamp) === 0); + assert.eq(1, dbPlacementDetails.shards.length); + assert.eq(databaseDetails.primary, dbPlacementDetails.shards[0]); + assert.eq(undefined, dbPlacementDetails.uuid); + return dbPlacementDetails; +} + +function getAndValidateLatestPlacementInfoForCollection(fullCollName) { + const placementQueryResults = configDB.placementHistory.find({nss: fullCollName}) + .sort({timestamp: -1}) + .limit(1) + .toArray(); + assert.eq(placementQueryResults.length, 1); + const placementDetails = placementQueryResults[0]; + + // Verify that the placementHistory document matches the related content stored in + // config.collections. + const configCollsQueryResults = + configDB.collections.find({_id: placementDetails.nss}).toArray(); + assert.eq(configCollsQueryResults.length, 1); + const collectionEntry = configCollsQueryResults[0]; + + assert.eq(collectionEntry.uuid, placementDetails.uuid); + assert(timestampCmp(collectionEntry.timestamp, placementDetails.timestamp) === 0); + return placementDetails; +} + +function testEnableSharding(dbName, primaryShardName) { + assert.commandWorked( + st.s.adminCommand({enableSharding: dbName, primaryShard: primaryShardName})); + getAndValidateLatestPlacementInfoForDB(dbName); +} + +function testShardCollection(dbName, collName) { + let nss = dbName + '.' + collName; + + // Shard the collection. Ensure enough chunks to cover all shards. + assert.commandWorked( + st.s.adminCommand({shardCollection: nss, key: {_id: "hashed"}, numInitialChunks: 20})); + // Verify that there is consistent placement info on the shared collection and its parent DB. + const dbPlacementInfo = getAndValidateLatestPlacementInfoForDB(dbName); + const collPlacementInfo = getAndValidateLatestPlacementInfoForCollection(nss); + assert(timestampCmp(dbPlacementInfo.timestamp, collPlacementInfo.timestamp) < 0); + + // Verify that the placementHistory document matches the related content stored in + // config.shards. + const entriesInConfigShards = configDB.shards.find({}, {_id: 1}).toArray().map((s) => s._id); + assert.sameMembers(entriesInConfigShards, collPlacementInfo.shards); +} + // TODO SERVER-69106 remove the logic to skip the test execution const historicalPlacementDataFeatureFlag = FeatureFlagUtil.isEnabled( st.configRS.getPrimary().getDB('admin'), "HistoricalPlacementShardingCatalog"); @@ -18,23 +77,15 @@ if (!historicalPlacementDataFeatureFlag) { return; } -const dbName = 'test'; -const configDB = st.s.getDB('config'); +jsTest.log('Testing placement entries added by explicit DB creation'); +testEnableSharding('explicitlyCreatedDB', st.shard0.shardName); + +jsTest.log( + 'Testing placement entries added by shardCollection() over an existing sharding-enabled DB'); +testShardCollection('explicitlyCreatedDB', 'coll1'); -jsTest.log('Verifying placement data generated by createDatabase()'); -assert.commandWorked( - st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName})); - -const placementHistoryEntries = configDB.placementHistory.find().toArray(); -assert.eq(placementHistoryEntries.length, 1); -const placementDetails = placementHistoryEntries[0]; -const databaseEntries = configDB.databases.find({_id: placementDetails.nss}).toArray(); -assert.eq(1, databaseEntries.length); -const databaseDetails = databaseEntries[0]; -assert(timestampCmp(databaseDetails.version.timestamp, placementDetails.timestamp) == 0); -assert.eq(1, placementDetails.shards.length); -assert.eq(databaseDetails.primary, placementDetails.shards[0]); -assert.eq(undefined, placementDetails.uuid); +jsTest.log('Testing placement entries added by shardCollection() over a non-existing db (& coll)'); +testShardCollection('implicitlyCreatedDB', 'coll1'); st.stop(); }()); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 789a7fcec6a..c0c21fdb65f 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -516,6 +516,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/cloner', + '$BUILD_DIR/mongo/db/cluster_transaction_api', '$BUILD_DIR/mongo/db/commands/cluster_server_parameter_commands_invocation', '$BUILD_DIR/mongo/db/commands/core', '$BUILD_DIR/mongo/db/commands/create_command', diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index 9257033699f..91b3d719e44 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -37,6 +37,7 @@ #include "mongo/db/catalog/collection_uuid_mismatch.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/cluster_transaction_api.h" #include "mongo/db/commands/create_gen.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/concurrency/exception_util.h" @@ -54,13 +55,16 @@ #include "mongo/db/timeseries/catalog_helper.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" +#include "mongo/db/transaction/transaction_api.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/catalog/type_namespace_placement_gen.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/cluster_write.h" #include "mongo/s/grid.h" #include "mongo/s/sharding_feature_flags_gen.h" + #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -330,7 +334,6 @@ void insertChunks(OperationContext* opCtx, } void insertCollectionEntry(OperationContext* opCtx, - const NamespaceString& nss, CollectionType& coll, const OperationSessionInfo& osi) { const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); @@ -352,6 +355,62 @@ void insertCollectionEntry(OperationContext* opCtx, &unusedResponse)); } +void insertCollectionAndPlacementEntries(OperationContext* opCtx, + const std::shared_ptr<executor::TaskExecutor>& executor, + const std::shared_ptr<CollectionType>& coll, + const ChunkVersion& placementVersion, + const std::shared_ptr<std::set<ShardId>>& shardIds) { + // Ensure that this function will only return once the transaction gets majority committed (and + // restore the original write concern on exit). + WriteConcernOptions originalWC = opCtx->getWriteConcern(); + opCtx->setWriteConcern(WriteConcernOptions{WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout}); + ScopeGuard guard([opCtx, &originalWC] { opCtx->setWriteConcern(originalWC); }); + + auto txnClient = std::make_unique<txn_api::details::SEPTransactionClient>( + opCtx, + executor, + std::make_unique<txn_api::details::ClusterSEPTransactionClientBehaviors>( + opCtx->getServiceContext())); + + /* + * The insertionChain callback may be run on a separate thread than the one serving + * insertCollectionAndPlacementEntries(). For this reason, all the referenced parameters have to + * be captured by value (shared_ptrs are used to reduce the memory footprint). + */ + const auto insertionChain = [coll, shardIds, placementVersion]( + const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + write_ops::InsertCommandRequest insertCollectionEntry(CollectionType::ConfigNS, + {coll->toBSON()}); + return txnClient.runCRUDOp(insertCollectionEntry, {}) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& insertCollectionEntryResponse) { + uassertStatusOK(insertCollectionEntryResponse.toStatus()); + + NamespacePlacementType placementInfo( + NamespaceString(coll->getNss()), + placementVersion.getTimestamp(), + std::vector<mongo::ShardId>(shardIds->cbegin(), shardIds->cend())); + placementInfo.setUuid(coll->getUuid()); + + write_ops::InsertCommandRequest insertPlacementEntry( + NamespaceString::kConfigsvrPlacementHistoryNamespace, {placementInfo.toBSON()}); + return txnClient.runCRUDOp(insertPlacementEntry, {}); + }) + .thenRunOn(txnExec) + .then([](const BatchedCommandResponse& insertPlacementEntryResponse) { + uassertStatusOK(insertPlacementEntryResponse.toStatus()); + }) + .semi(); + }; + + txn_api::SyncTransactionWithRetries txn( + opCtx, executor, nullptr /*resourceYielder*/, std::move(txnClient)); + txn.run(opCtx, insertionChain); +} + void broadcastDropCollection(OperationContext* opCtx, const NamespaceString& nss, const std::shared_ptr<executor::TaskExecutor>& executor, @@ -534,7 +593,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( _updateSession(opCtx); _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession()); - _commit(opCtx); + _commit(opCtx, **executor); } // End of the critical section, from now on, read and writes are permitted. @@ -545,7 +604,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // collections. if (!_splitPolicy->isOptimized()) { _createChunks(opCtx, shardKeyPattern); - _commit(opCtx); + _commit(opCtx, **executor); } })) .then([this] { @@ -1119,38 +1178,59 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards( } } -void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { +void CreateCollectionCoordinator::_commit(OperationContext* opCtx, + const std::shared_ptr<executor::TaskExecutor>& executor) { LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = nss()); // Upsert Chunks. _updateSession(opCtx); insertChunks(opCtx, _initialChunks->chunks, getCurrentSession()); - CollectionType coll(nss(), - _initialChunks->collVersion().epoch(), - _initialChunks->collVersion().getTimestamp(), - Date_t::now(), - *_collectionUUID, - _doc.getTranslatedRequestParams()->getKeyPattern()); + // The coll and shardsHoldingData objects will be used by both this function and + // insertCollectionAndPlacementEntries(), which accesses their content from a separate thread + // (through the Internal Transactions API). In order to avoid segmentation faults and minimise + // the memory footprint, such variables get instantiated as shared_ptrs. + auto coll = + std::make_shared<CollectionType>(nss(), + _initialChunks->collVersion().epoch(), + _initialChunks->collVersion().getTimestamp(), + Date_t::now(), + *_collectionUUID, + _doc.getTranslatedRequestParams()->getKeyPattern()); + + auto shardsHoldingData = std::make_shared<std::set<ShardId>>(); + for (const auto& chunk : _initialChunks->chunks) { + const auto& chunkShardId = chunk.getShard(); + shardsHoldingData->emplace(chunkShardId); + } + + const auto& placementVersion = _initialChunks->chunks.back().getVersion(); if (_request.getTimeseries()) { TypeCollectionTimeseriesFields timeseriesFields; timeseriesFields.setTimeseriesOptions(*_request.getTimeseries()); - coll.setTimeseriesFields(std::move(timeseriesFields)); + coll->setTimeseriesFields(std::move(timeseriesFields)); } if (auto collationBSON = _doc.getTranslatedRequestParams()->getCollation(); !collationBSON.isEmpty()) { - coll.setDefaultCollation(collationBSON); + coll->setDefaultCollation(collationBSON); } if (_request.getUnique()) { - coll.setUnique(*_request.getUnique()); + coll->setUnique(*_request.getUnique()); } _updateSession(opCtx); + try { - insertCollectionEntry(opCtx, nss(), coll, getCurrentSession()); + if (feature_flags::gHistoricalPlacementShardingCatalog.isEnabled( + serverGlobalParams.featureCompatibility)) { + insertCollectionAndPlacementEntries( + opCtx, executor, coll, placementVersion, shardsHoldingData); + } else { + insertCollectionEntry(opCtx, *coll, getCurrentSession()); + } notifyChangeStreamsOnShardCollection(opCtx, nss(), *_collectionUUID, _request.toBSON()); @@ -1177,22 +1257,16 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { auto shardRegistry = Grid::get(opCtx)->shardRegistry(); auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId(); - std::set<ShardId> shardsRefreshed; - for (const auto& chunk : _initialChunks->chunks) { - const auto& chunkShardId = chunk.getShard(); - - if (chunkShardId == dbPrimaryShardId || - shardsRefreshed.find(chunkShardId) != shardsRefreshed.end()) { + for (const auto& shardid : *shardsHoldingData) { + if (shardid == dbPrimaryShardId) { continue; } - auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, chunkShardId)); + auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardid)); shard->runFireAndForgetCommand(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, NamespaceString::kAdminDb.toString(), BSON("_flushRoutingTableCacheUpdates" << nss().ns())); - - shardsRefreshed.emplace(chunkShardId); } LOGV2(5277901, @@ -1201,7 +1275,6 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { "numInitialChunks"_attr = _initialChunks->chunks.size(), "initialCollectionVersion"_attr = _initialChunks->collVersion()); - const auto placementVersion = _initialChunks->chunks.back().getVersion(); auto result = CreateCollectionResponse( {placementVersion, CollectionIndexes(placementVersion, boost::none)}); result.setCollectionUUID(_collectionUUID); diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h index 86aee402c3f..a86aee178c4 100644 --- a/src/mongo/db/s/create_collection_coordinator.h +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -128,8 +128,10 @@ private: * Does the following writes: * 1. Updates the config.collections entry for the new sharded collection * 2. Updates config.chunks entries for the new sharded collection + * 3. Inserts an entry into config.placementHistory with the sublist of shards that will host + * one or more chunks of the new collections at creation time */ - void _commit(OperationContext* opCtx); + void _commit(OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor); /** * Helper function to audit and log the shard collection event. |