summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2022-10-17 10:16:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-17 10:46:28 +0000
commit613df409e8e545cefb494c140565bdbe85838ce2 (patch)
tree00144667efe57fad422de899b68610edbbc4459b
parent4826dded41f235887cd472845227926e4c86acd5 (diff)
downloadmongo-613df409e8e545cefb494c140565bdbe85838ce2.tar.gz
SERVER-68927 Store placement changes into config.placementHistory when a shardCollection() command gets committed on the config server
-rw-r--r--jstests/sharding/sessions_collection_auto_healing.js24
-rw-r--r--jstests/sharding/store_historical_placement_data.js95
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp121
-rw-r--r--src/mongo/db/s/create_collection_coordinator.h4
5 files changed, 192 insertions, 53 deletions
diff --git a/jstests/sharding/sessions_collection_auto_healing.js b/jstests/sharding/sessions_collection_auto_healing.js
index 5689c4fd3ec..109b8fe96d0 100644
--- a/jstests/sharding/sessions_collection_auto_healing.js
+++ b/jstests/sharding/sessions_collection_auto_healing.js
@@ -3,6 +3,7 @@
* ]
*/
load('jstests/libs/sessions_collection.js');
+load("jstests/libs/feature_flag_util.js");
(function() {
"use strict";
@@ -119,19 +120,30 @@ var shardConfig = shard.getDB("config");
validateSessionsCollection(shard, true, true);
- // We will have two sessions because of the session used in the shardCollection's retryable
- // write to shard the sessions collection. It will disappear after we run the refresh
- // function on the shard.
- assert.eq(shardConfig.system.sessions.countDocuments({}), 2, "did not flush config's sessions");
+ // TODO SERVER-69106 adapt the test assuming that the flag will be always enabled.
+ const historicalPlacementDataFeatureFlag = FeatureFlagUtil.isEnabled(
+ st.configRS.getPrimary().getDB('admin'), "HistoricalPlacementShardingCatalog");
+ const sessionsOpenedByShardCollectionCmd = historicalPlacementDataFeatureFlag ? 3 : 2;
+
+ // We will have sessionsOpenedByShardCollectionCmd sessions because of the sessions used in the
+ // shardCollection's retryable write to shard the sessions collection. It will disappear after
+ // we run the refresh function on the shard.
+ assert.eq(shardConfig.system.sessions.countDocuments({}),
+ sessionsOpenedByShardCollectionCmd,
+ "did not flush config's sessions");
// Now, if we do refreshes on the other servers, their in-mem records will
// be written to the collection.
assert.commandWorked(shard.adminCommand({refreshLogicalSessionCacheNow: 1}));
- assert.eq(shardConfig.system.sessions.countDocuments({}), 3, "did not flush shard's sessions");
+ assert.eq(shardConfig.system.sessions.countDocuments({}),
+ sessionsOpenedByShardCollectionCmd + 1,
+ "did not flush shard's sessions");
rs.awaitLastOpCommitted();
assert.commandWorked(mongos.adminCommand({refreshLogicalSessionCacheNow: 1}));
- assert.eq(shardConfig.system.sessions.countDocuments({}), 5, "did not flush mongos' sessions");
+ assert.eq(shardConfig.system.sessions.countDocuments({}),
+ sessionsOpenedByShardCollectionCmd + 3,
+ "did not flush mongos' sessions");
}
// Test that if we drop the index on the sessions collection, only a refresh on the config
diff --git a/jstests/sharding/store_historical_placement_data.js b/jstests/sharding/store_historical_placement_data.js
index 844d46cf54e..f4b2831df02 100644
--- a/jstests/sharding/store_historical_placement_data.js
+++ b/jstests/sharding/store_historical_placement_data.js
@@ -1,14 +1,73 @@
-/*
- * The test verifies that each Sharding DDL operation that gets successfully completed
- * also produces a document detailing the changes to the placement of the targeted nss.
- *
- *
- */
+
(function() {
"use strict";
load("jstests/libs/feature_flag_util.js");
const st = new ShardingTest({shards: 2});
+const configDB = st.s.getDB('config');
+
+function getAndValidateLatestPlacementInfoForDB(dbName) {
+ const placementQueryResults =
+ configDB.placementHistory.find({nss: dbName}).sort({timestamp: -1}).limit(1).toArray();
+ assert.eq(placementQueryResults.length, 1);
+ const dbPlacementDetails = placementQueryResults[0];
+
+ // Verify that the placementHistory document matches the related content stored in
+ // config.databases.
+ const configDBsQueryResults = configDB.databases.find({_id: dbPlacementDetails.nss}).toArray();
+ assert.eq(1, configDBsQueryResults.length);
+ const databaseDetails = configDBsQueryResults[0];
+
+ assert(timestampCmp(databaseDetails.version.timestamp, dbPlacementDetails.timestamp) === 0);
+ assert.eq(1, dbPlacementDetails.shards.length);
+ assert.eq(databaseDetails.primary, dbPlacementDetails.shards[0]);
+ assert.eq(undefined, dbPlacementDetails.uuid);
+ return dbPlacementDetails;
+}
+
+function getAndValidateLatestPlacementInfoForCollection(fullCollName) {
+ const placementQueryResults = configDB.placementHistory.find({nss: fullCollName})
+ .sort({timestamp: -1})
+ .limit(1)
+ .toArray();
+ assert.eq(placementQueryResults.length, 1);
+ const placementDetails = placementQueryResults[0];
+
+ // Verify that the placementHistory document matches the related content stored in
+ // config.collections.
+ const configCollsQueryResults =
+ configDB.collections.find({_id: placementDetails.nss}).toArray();
+ assert.eq(configCollsQueryResults.length, 1);
+ const collectionEntry = configCollsQueryResults[0];
+
+ assert.eq(collectionEntry.uuid, placementDetails.uuid);
+ assert(timestampCmp(collectionEntry.timestamp, placementDetails.timestamp) === 0);
+ return placementDetails;
+}
+
+function testEnableSharding(dbName, primaryShardName) {
+ assert.commandWorked(
+ st.s.adminCommand({enableSharding: dbName, primaryShard: primaryShardName}));
+ getAndValidateLatestPlacementInfoForDB(dbName);
+}
+
+function testShardCollection(dbName, collName) {
+ let nss = dbName + '.' + collName;
+
+ // Shard the collection. Ensure enough chunks to cover all shards.
+ assert.commandWorked(
+ st.s.adminCommand({shardCollection: nss, key: {_id: "hashed"}, numInitialChunks: 20}));
+ // Verify that there is consistent placement info on the shared collection and its parent DB.
+ const dbPlacementInfo = getAndValidateLatestPlacementInfoForDB(dbName);
+ const collPlacementInfo = getAndValidateLatestPlacementInfoForCollection(nss);
+ assert(timestampCmp(dbPlacementInfo.timestamp, collPlacementInfo.timestamp) < 0);
+
+ // Verify that the placementHistory document matches the related content stored in
+ // config.shards.
+ const entriesInConfigShards = configDB.shards.find({}, {_id: 1}).toArray().map((s) => s._id);
+ assert.sameMembers(entriesInConfigShards, collPlacementInfo.shards);
+}
+
// TODO SERVER-69106 remove the logic to skip the test execution
const historicalPlacementDataFeatureFlag = FeatureFlagUtil.isEnabled(
st.configRS.getPrimary().getDB('admin'), "HistoricalPlacementShardingCatalog");
@@ -18,23 +77,15 @@ if (!historicalPlacementDataFeatureFlag) {
return;
}
-const dbName = 'test';
-const configDB = st.s.getDB('config');
+jsTest.log('Testing placement entries added by explicit DB creation');
+testEnableSharding('explicitlyCreatedDB', st.shard0.shardName);
+
+jsTest.log(
+ 'Testing placement entries added by shardCollection() over an existing sharding-enabled DB');
+testShardCollection('explicitlyCreatedDB', 'coll1');
-jsTest.log('Verifying placement data generated by createDatabase()');
-assert.commandWorked(
- st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
-
-const placementHistoryEntries = configDB.placementHistory.find().toArray();
-assert.eq(placementHistoryEntries.length, 1);
-const placementDetails = placementHistoryEntries[0];
-const databaseEntries = configDB.databases.find({_id: placementDetails.nss}).toArray();
-assert.eq(1, databaseEntries.length);
-const databaseDetails = databaseEntries[0];
-assert(timestampCmp(databaseDetails.version.timestamp, placementDetails.timestamp) == 0);
-assert.eq(1, placementDetails.shards.length);
-assert.eq(databaseDetails.primary, placementDetails.shards[0]);
-assert.eq(undefined, placementDetails.uuid);
+jsTest.log('Testing placement entries added by shardCollection() over a non-existing db (& coll)');
+testShardCollection('implicitlyCreatedDB', 'coll1');
st.stop();
}());
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 789a7fcec6a..c0c21fdb65f 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -516,6 +516,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/cloner',
+ '$BUILD_DIR/mongo/db/cluster_transaction_api',
'$BUILD_DIR/mongo/db/commands/cluster_server_parameter_commands_invocation',
'$BUILD_DIR/mongo/db/commands/core',
'$BUILD_DIR/mongo/db/commands/create_command',
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index 9257033699f..91b3d719e44 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/catalog/collection_uuid_mismatch.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/cluster_transaction_api.h"
#include "mongo/db/commands/create_gen.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/concurrency/exception_util.h"
@@ -54,13 +55,16 @@
#include "mongo/db/timeseries/catalog_helper.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
+#include "mongo/db/transaction/transaction_api.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/catalog/type_namespace_placement_gen.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/cluster_write.h"
#include "mongo/s/grid.h"
#include "mongo/s/sharding_feature_flags_gen.h"
+
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
@@ -330,7 +334,6 @@ void insertChunks(OperationContext* opCtx,
}
void insertCollectionEntry(OperationContext* opCtx,
- const NamespaceString& nss,
CollectionType& coll,
const OperationSessionInfo& osi) {
const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
@@ -352,6 +355,62 @@ void insertCollectionEntry(OperationContext* opCtx,
&unusedResponse));
}
+void insertCollectionAndPlacementEntries(OperationContext* opCtx,
+ const std::shared_ptr<executor::TaskExecutor>& executor,
+ const std::shared_ptr<CollectionType>& coll,
+ const ChunkVersion& placementVersion,
+ const std::shared_ptr<std::set<ShardId>>& shardIds) {
+ // Ensure that this function will only return once the transaction gets majority committed (and
+ // restore the original write concern on exit).
+ WriteConcernOptions originalWC = opCtx->getWriteConcern();
+ opCtx->setWriteConcern(WriteConcernOptions{WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout});
+ ScopeGuard guard([opCtx, &originalWC] { opCtx->setWriteConcern(originalWC); });
+
+ auto txnClient = std::make_unique<txn_api::details::SEPTransactionClient>(
+ opCtx,
+ executor,
+ std::make_unique<txn_api::details::ClusterSEPTransactionClientBehaviors>(
+ opCtx->getServiceContext()));
+
+ /*
+ * The insertionChain callback may be run on a separate thread than the one serving
+ * insertCollectionAndPlacementEntries(). For this reason, all the referenced parameters have to
+ * be captured by value (shared_ptrs are used to reduce the memory footprint).
+ */
+ const auto insertionChain = [coll, shardIds, placementVersion](
+ const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
+ write_ops::InsertCommandRequest insertCollectionEntry(CollectionType::ConfigNS,
+ {coll->toBSON()});
+ return txnClient.runCRUDOp(insertCollectionEntry, {})
+ .thenRunOn(txnExec)
+ .then([&](const BatchedCommandResponse& insertCollectionEntryResponse) {
+ uassertStatusOK(insertCollectionEntryResponse.toStatus());
+
+ NamespacePlacementType placementInfo(
+ NamespaceString(coll->getNss()),
+ placementVersion.getTimestamp(),
+ std::vector<mongo::ShardId>(shardIds->cbegin(), shardIds->cend()));
+ placementInfo.setUuid(coll->getUuid());
+
+ write_ops::InsertCommandRequest insertPlacementEntry(
+ NamespaceString::kConfigsvrPlacementHistoryNamespace, {placementInfo.toBSON()});
+ return txnClient.runCRUDOp(insertPlacementEntry, {});
+ })
+ .thenRunOn(txnExec)
+ .then([](const BatchedCommandResponse& insertPlacementEntryResponse) {
+ uassertStatusOK(insertPlacementEntryResponse.toStatus());
+ })
+ .semi();
+ };
+
+ txn_api::SyncTransactionWithRetries txn(
+ opCtx, executor, nullptr /*resourceYielder*/, std::move(txnClient));
+ txn.run(opCtx, insertionChain);
+}
+
void broadcastDropCollection(OperationContext* opCtx,
const NamespaceString& nss,
const std::shared_ptr<executor::TaskExecutor>& executor,
@@ -534,7 +593,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
_updateSession(opCtx);
_createCollectionOnNonPrimaryShards(opCtx, getCurrentSession());
- _commit(opCtx);
+ _commit(opCtx, **executor);
}
// End of the critical section, from now on, read and writes are permitted.
@@ -545,7 +604,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// collections.
if (!_splitPolicy->isOptimized()) {
_createChunks(opCtx, shardKeyPattern);
- _commit(opCtx);
+ _commit(opCtx, **executor);
}
}))
.then([this] {
@@ -1119,38 +1178,59 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(
}
}
-void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
+void CreateCollectionCoordinator::_commit(OperationContext* opCtx,
+ const std::shared_ptr<executor::TaskExecutor>& executor) {
LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = nss());
// Upsert Chunks.
_updateSession(opCtx);
insertChunks(opCtx, _initialChunks->chunks, getCurrentSession());
- CollectionType coll(nss(),
- _initialChunks->collVersion().epoch(),
- _initialChunks->collVersion().getTimestamp(),
- Date_t::now(),
- *_collectionUUID,
- _doc.getTranslatedRequestParams()->getKeyPattern());
+ // The coll and shardsHoldingData objects will be used by both this function and
+ // insertCollectionAndPlacementEntries(), which accesses their content from a separate thread
+ // (through the Internal Transactions API). In order to avoid segmentation faults and minimise
+ // the memory footprint, such variables get instantiated as shared_ptrs.
+ auto coll =
+ std::make_shared<CollectionType>(nss(),
+ _initialChunks->collVersion().epoch(),
+ _initialChunks->collVersion().getTimestamp(),
+ Date_t::now(),
+ *_collectionUUID,
+ _doc.getTranslatedRequestParams()->getKeyPattern());
+
+ auto shardsHoldingData = std::make_shared<std::set<ShardId>>();
+ for (const auto& chunk : _initialChunks->chunks) {
+ const auto& chunkShardId = chunk.getShard();
+ shardsHoldingData->emplace(chunkShardId);
+ }
+
+ const auto& placementVersion = _initialChunks->chunks.back().getVersion();
if (_request.getTimeseries()) {
TypeCollectionTimeseriesFields timeseriesFields;
timeseriesFields.setTimeseriesOptions(*_request.getTimeseries());
- coll.setTimeseriesFields(std::move(timeseriesFields));
+ coll->setTimeseriesFields(std::move(timeseriesFields));
}
if (auto collationBSON = _doc.getTranslatedRequestParams()->getCollation();
!collationBSON.isEmpty()) {
- coll.setDefaultCollation(collationBSON);
+ coll->setDefaultCollation(collationBSON);
}
if (_request.getUnique()) {
- coll.setUnique(*_request.getUnique());
+ coll->setUnique(*_request.getUnique());
}
_updateSession(opCtx);
+
try {
- insertCollectionEntry(opCtx, nss(), coll, getCurrentSession());
+ if (feature_flags::gHistoricalPlacementShardingCatalog.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ insertCollectionAndPlacementEntries(
+ opCtx, executor, coll, placementVersion, shardsHoldingData);
+ } else {
+ insertCollectionEntry(opCtx, *coll, getCurrentSession());
+ }
notifyChangeStreamsOnShardCollection(opCtx, nss(), *_collectionUUID, _request.toBSON());
@@ -1177,22 +1257,16 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId();
- std::set<ShardId> shardsRefreshed;
- for (const auto& chunk : _initialChunks->chunks) {
- const auto& chunkShardId = chunk.getShard();
-
- if (chunkShardId == dbPrimaryShardId ||
- shardsRefreshed.find(chunkShardId) != shardsRefreshed.end()) {
+ for (const auto& shardid : *shardsHoldingData) {
+ if (shardid == dbPrimaryShardId) {
continue;
}
- auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, chunkShardId));
+ auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardid));
shard->runFireAndForgetCommand(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
NamespaceString::kAdminDb.toString(),
BSON("_flushRoutingTableCacheUpdates" << nss().ns()));
-
- shardsRefreshed.emplace(chunkShardId);
}
LOGV2(5277901,
@@ -1201,7 +1275,6 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
"numInitialChunks"_attr = _initialChunks->chunks.size(),
"initialCollectionVersion"_attr = _initialChunks->collVersion());
- const auto placementVersion = _initialChunks->chunks.back().getVersion();
auto result = CreateCollectionResponse(
{placementVersion, CollectionIndexes(placementVersion, boost::none)});
result.setCollectionUUID(_collectionUUID);
diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h
index 86aee402c3f..a86aee178c4 100644
--- a/src/mongo/db/s/create_collection_coordinator.h
+++ b/src/mongo/db/s/create_collection_coordinator.h
@@ -128,8 +128,10 @@ private:
* Does the following writes:
* 1. Updates the config.collections entry for the new sharded collection
* 2. Updates config.chunks entries for the new sharded collection
+ * 3. Inserts an entry into config.placementHistory with the sublist of shards that will host
+ * one or more chunks of the new collections at creation time
*/
- void _commit(OperationContext* opCtx);
+ void _commit(OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor);
/**
* Helper function to audit and log the shard collection event.