summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorMarcos José Grillo Ramirez <marcos.grillo@mongodb.com>2022-11-08 15:20:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-08 16:01:26 +0000
commitccffa2062f09352f6c064e64090bdda42b829e73 (patch)
tree0951cc91e27d3dba9cfdcbeba80c0b88a0cbbdd5 /src/mongo/db/s
parentf906c9b49d589616a9bc8147cf62e6092fed600b (diff)
downloadmongo-ccffa2062f09352f6c064e64090bdda42b829e73.tar.gz
SERVER-67103 Add global index copy when migrating a chunk
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.cpp13
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.h7
-rw-r--r--src/mongo/db/s/global_index_ddl_util.cpp152
-rw-r--r--src/mongo/db/s/global_index_ddl_util.h18
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp36
-rw-r--r--src/mongo/db/s/migration_destination_manager.h1
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp85
8 files changed, 278 insertions, 36 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 98507bbd809..fa95c943bcd 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -66,9 +66,7 @@ env.Library(
'$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/ops/write_ops',
- '$BUILD_DIR/mongo/db/session/logical_session_id_helpers',
'$BUILD_DIR/mongo/db/shard_role',
- '$BUILD_DIR/mongo/db/transaction/transaction',
'$BUILD_DIR/mongo/s/common_s',
],
)
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp
index bdbd027dc50..a612fdaed01 100644
--- a/src/mongo/db/s/collection_sharding_runtime.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime.cpp
@@ -542,6 +542,19 @@ void CollectionShardingRuntime::clearIndexes(OperationContext* opCtx) {
_globalIndexesInfo = boost::none;
}
+void CollectionShardingRuntime::replaceIndexes(OperationContext* opCtx,
+ const std::vector<IndexCatalogType>& indexes,
+ const CollectionIndexes& collectionIndexes) {
+ if (_globalIndexesInfo) {
+ _globalIndexesInfo = boost::none;
+ }
+ IndexCatalogTypeMap indexMap;
+ for (const auto& index : indexes) {
+ indexMap.emplace(index.getName(), index);
+ }
+ _globalIndexesInfo.emplace(collectionIndexes, std::move(indexMap));
+}
+
CollectionCriticalSection::CollectionCriticalSection(OperationContext* opCtx,
NamespaceString nss,
BSONObj reason)
diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h
index 06d79923344..cac64e74c5c 100644
--- a/src/mongo/db/s/collection_sharding_runtime.h
+++ b/src/mongo/db/s/collection_sharding_runtime.h
@@ -255,6 +255,13 @@ public:
*/
void clearIndexes(OperationContext* opCtx);
+ /**
+ * Clears all the indexes and set the new indexes and index version.
+ */
+ void replaceIndexes(OperationContext* opCtx,
+ const std::vector<IndexCatalogType>& indexes,
+ const CollectionIndexes& collectionIndexes);
+
private:
struct ShardVersionRecoverOrRefresh {
public:
diff --git a/src/mongo/db/s/global_index_ddl_util.cpp b/src/mongo/db/s/global_index_ddl_util.cpp
index 0660c51db32..b66fd71d1e7 100644
--- a/src/mongo/db/s/global_index_ddl_util.cpp
+++ b/src/mongo/db/s/global_index_ddl_util.cpp
@@ -35,18 +35,30 @@
#include "mongo/db/op_observer/op_observer.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/update.h"
-#include "mongo/db/transaction/retryable_writes_stats.h"
-#include "mongo/db/transaction/transaction_participant.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_index_catalog_gen.h"
-#include "mongo/s/grid.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
namespace mongo {
+namespace {
+
+/**
+ * Remove all indexes by uuid.
+ */
+void deleteGlobalIndexes(OperationContext* opCtx,
+ const CollectionPtr& collection,
+ const UUID& uuid) {
+ mongo::deleteObjects(opCtx,
+ collection,
+ NamespaceString::kShardIndexCatalogNamespace,
+ BSON(IndexCatalogType::kCollectionUUIDFieldName << uuid),
+ false);
+}
+} // namespace
+
void addGlobalIndexCatalogEntryToCollection(OperationContext* opCtx,
const NamespaceString& userCollectionNss,
const std::string& name,
@@ -72,7 +84,7 @@ void addGlobalIndexCatalogEntryToCollection(OperationContext* opCtx,
BSONObj collectionDoc;
bool docExists =
Helpers::findOne(opCtx, collsColl.getCollection(), query, collectionDoc);
- if (docExists &&
+ if (docExists && !collectionDoc[CollectionType::kIndexVersionFieldName].eoo() &&
lastmod <= collectionDoc[CollectionType::kIndexVersionFieldName].timestamp()) {
LOGV2_DEBUG(
6712300,
@@ -142,7 +154,7 @@ void removeGlobalIndexCatalogEntryFromCollection(OperationContext* opCtx,
BSONObj collectionDoc;
bool docExists =
Helpers::findOne(opCtx, collsColl.getCollection(), query, collectionDoc);
- if (docExists &&
+ if (docExists && !collectionDoc[CollectionType::kIndexVersionFieldName].eoo() &&
lastmod <= collectionDoc[CollectionType::kIndexVersionFieldName].timestamp()) {
LOGV2_DEBUG(
6712301,
@@ -173,11 +185,12 @@ void removeGlobalIndexCatalogEntryFromCollection(OperationContext* opCtx,
{
repl::UnreplicatedWritesBlock uneplicatedWritesBlock(opCtx);
- auto idStr = format(FMT_STRING("{}_{}"), collectionUUID.toString(), indexName);
mongo::deleteObjects(opCtx,
idxColl.getCollection(),
NamespaceString::kShardIndexCatalogNamespace,
- BSON("_id" << idStr),
+ BSON(IndexCatalogType::kCollectionUUIDFieldName
+ << collectionUUID << IndexCatalogType::kNameFieldName
+ << indexName),
true);
}
@@ -197,4 +210,127 @@ void removeGlobalIndexCatalogEntryFromCollection(OperationContext* opCtx,
});
}
+void replaceGlobalIndexes(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ const Timestamp& indexVersion,
+ const std::vector<IndexCatalogType>& indexes) {
+ writeConflictRetry(
+ opCtx, "ReplaceIndexCatalog", NamespaceString::kShardIndexCatalogNamespace.ns(), [&]() {
+ WriteUnitOfWork wunit(opCtx);
+ AutoGetCollection collsColl(
+ opCtx, NamespaceString::kShardCollectionCatalogNamespace, MODE_IX);
+ {
+ // Set final indexVersion
+ const auto query = BSON(CollectionType::kNssFieldName
+ << nss.ns() << CollectionType::kUuidFieldName << uuid);
+
+ // Update the document (or create it) with the new index version
+ repl::UnreplicatedWritesBlock uneplicatedWritesBlock(opCtx);
+ auto request = UpdateRequest();
+ request.setNamespaceString(NamespaceString::kShardCollectionCatalogNamespace);
+ request.setQuery(query);
+ request.setUpdateModification(BSON(CollectionType::kNssFieldName
+ << nss.ns() << CollectionType::kUuidFieldName
+ << uuid << CollectionType::kIndexVersionFieldName
+ << indexVersion));
+ request.setUpsert(true);
+ request.setFromOplogApplication(true);
+ mongo::update(opCtx, collsColl.getDb(), request);
+ }
+
+ AutoGetCollection idxColl(opCtx, NamespaceString::kShardIndexCatalogNamespace, MODE_IX);
+ BSONArrayBuilder indexesBSON;
+ {
+ // Clear old indexes.
+ repl::UnreplicatedWritesBlock uneplicatedWritesBlock(opCtx);
+ deleteGlobalIndexes(opCtx, idxColl.getCollection(), uuid);
+
+ // Add new indexes.
+ for (const auto& i : indexes) {
+ // Attach a custom generated _id.
+ auto indexBSON = i.toBSON();
+ BSONObjBuilder builder(indexBSON);
+ auto idStr =
+ format(FMT_STRING("{}_{}"), uuid.toString(), i.getName().toString());
+ builder.append("_id", idStr);
+ uassertStatusOK(
+ collection_internal::insertDocument(opCtx,
+ idxColl.getCollection(),
+ InsertStatement{builder.done()},
+ nullptr,
+ false));
+
+ indexesBSON.append(indexBSON);
+ }
+ }
+ auto entryObj = BSON("op"
+ << "r"
+ << "entry"
+ << BSON(IndexCatalogType::kCollectionUUIDFieldName
+ << uuid << CollectionType::kNssFieldName << nss.toString()
+ << "v" << indexVersion << "i" << indexesBSON.arr()));
+
+ opCtx->getServiceContext()
+ ->getOpObserver()
+ ->onModifyShardedCollectionGlobalIndexCatalogEntry(
+ opCtx, nss, idxColl->uuid(), entryObj);
+ wunit.commit();
+ });
+}
+
+void clearGlobalIndexes(OperationContext* opCtx,
+ const NamespaceString& userCollectionNss,
+ const UUID& collectionUUID) {
+ writeConflictRetry(
+ opCtx, "ClearIndexCatalogEntry", NamespaceString::kShardIndexCatalogNamespace.ns(), [&]() {
+ WriteUnitOfWork wunit(opCtx);
+ AutoGetCollection collsColl(
+ opCtx, NamespaceString::kShardCollectionCatalogNamespace, MODE_IX);
+ {
+ // First unset the index version.
+ const auto query = BSON(CollectionType::kNssFieldName
+ << userCollectionNss.ns() << CollectionType::kUuidFieldName
+ << collectionUUID);
+ BSONObj collectionDoc;
+ bool docExists =
+ Helpers::findOne(opCtx, collsColl.getCollection(), query, collectionDoc);
+ // Return if there is nothing to clear.
+ if (!docExists || collectionDoc[CollectionType::kIndexVersionFieldName].eoo()) {
+ return;
+ }
+ // Update the document (or create it) with the new index version
+ repl::UnreplicatedWritesBlock uneplicatedWritesBlock(opCtx);
+ auto request = UpdateRequest();
+ request.setNamespaceString(NamespaceString::kShardCollectionCatalogNamespace);
+ request.setQuery(query);
+ request.setUpdateModification(write_ops::UpdateModification::parseFromClassicUpdate(
+ BSON("$unset" << BSON(CollectionType::kIndexVersionFieldName << 1))));
+ request.setUpsert(true);
+ request.setFromOplogApplication(true);
+ mongo::update(opCtx, collsColl.getDb(), request);
+ }
+
+ AutoGetCollection idxColl(opCtx, NamespaceString::kShardIndexCatalogNamespace, MODE_IX);
+
+ BSONArrayBuilder queryIds;
+ {
+ repl::UnreplicatedWritesBlock uneplicatedWritesBlock(opCtx);
+ deleteGlobalIndexes(opCtx, idxColl.getCollection(), collectionUUID);
+ }
+ auto entryObj = BSON("op"
+ << "c"
+ << "entry"
+ << BSON(IndexCatalogType::kCollectionUUIDFieldName
+ << collectionUUID << CollectionType::kNssFieldName
+ << userCollectionNss.toString()));
+
+ opCtx->getServiceContext()
+ ->getOpObserver()
+ ->onModifyShardedCollectionGlobalIndexCatalogEntry(
+ opCtx, userCollectionNss, idxColl->uuid(), entryObj);
+ wunit.commit();
+ });
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/global_index_ddl_util.h b/src/mongo/db/s/global_index_ddl_util.h
index fc37ff5a06a..d4410a9480a 100644
--- a/src/mongo/db/s/global_index_ddl_util.h
+++ b/src/mongo/db/s/global_index_ddl_util.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/operation_context.h"
+#include "mongo/s/catalog/type_index_catalog_gen.h"
namespace mongo {
@@ -60,4 +61,21 @@ void removeGlobalIndexCatalogEntryFromCollection(OperationContext* opCtx,
const std::string& indexName,
const Timestamp& lastmod);
+/**
+ * Removes all the indexes and the current index version, and replace them for the specified indexes
+ * and indexVersion.
+ */
+void replaceGlobalIndexes(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ const Timestamp& indexVersion,
+ const std::vector<IndexCatalogType>& indexes);
+
+/**
+ * Removes all the indexes and unset the current index version.
+ */
+void clearGlobalIndexes(OperationContext* opCtx,
+ const NamespaceString& userCollectionNss,
+ const UUID& collectionUUID);
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 455aa946da7..9e3e3a095af 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/global_index_ddl_util.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/move_timing_helper.h"
#include "mongo/db/s/operation_sharding_state.h"
@@ -74,6 +75,7 @@
#include "mongo/db/write_block_bypass.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_index_catalog_gen.h"
#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_commands_helpers.h"
@@ -88,7 +90,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
-
namespace mongo {
namespace {
@@ -293,6 +294,32 @@ bool migrationRecipientRecoveryDocumentExists(OperationContext* opCtx,
<< sessionId.toString())) > 0;
}
+void replaceGlobalIndexesInShardIfNeeded(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid) {
+ auto currentShardHasAnyChunks = [&]() -> bool {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+ auto scsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx, nss, CSRAcquisitionMode::kShared);
+ const auto optMetadata = scsr->getCurrentMetadataIfKnown();
+ return optMetadata && optMetadata->currentShardHasAnyChunks();
+ }();
+
+ // Early return, this shard already contains chunks, so there is no need for consolidate.
+ if (currentShardHasAnyChunks) {
+ return;
+ }
+
+ auto [collection, indexes] = Grid::get(opCtx)->catalogClient()->getCollectionAndGlobalIndexes(
+ opCtx, nss, {repl::ReadConcernLevel::kSnapshotReadConcern});
+ if (collection.getIndexVersion()) {
+ replaceGlobalIndexes(
+ opCtx, nss, uuid, collection.getIndexVersion()->indexVersion(), indexes);
+ } else {
+ clearGlobalIndexes(opCtx, nss, uuid);
+ }
+}
+
// Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread which
// receives a chunk migration from the donor.
MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep1);
@@ -1267,6 +1294,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
cloneCollectionIndexesAndOptions(
altOpCtx.get(), _nss, donorCollectionOptionsAndIndexes);
+ // Get the global indexes and install them.
+ if (feature_flags::gGlobalIndexesShardingCatalog.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ replaceGlobalIndexesInShardIfNeeded(
+ altOpCtx.get(), _nss, donorCollectionOptionsAndIndexes.uuid);
+ }
+
timing->done(2);
migrateThreadHangAtStep2.pauseWhileSet();
}
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 8a6ba36e5e1..d63b6696d57 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -193,7 +193,6 @@ public:
const boost::optional<ChunkManager>& cm,
boost::optional<Timestamp> afterClusterTime);
-
/**
* Creates the collection on the shard and clones the indexes and options.
*/
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index 4253499fec9..1efc635e7d2 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -527,30 +527,67 @@ void ShardServerOpObserver::onModifyShardedCollectionGlobalIndexCatalogEntry(
1,
"Updating sharding in-memory state onModifyShardedCollectionGlobalIndexCatalogEntry",
"indexDoc"_attr = indexDoc);
- if (indexDoc["op"].str() == "i") {
- auto indexEntry = IndexCatalogType::parse(
- IDLParserContext("onModifyShardedCollectionGlobalIndexCatalogEntry"),
- indexDoc["entry"].Obj());
- auto indexVersion = indexDoc["entry"][IndexCatalogType::kLastmodFieldName].timestamp();
- auto uuid = uassertStatusOK(
- UUID::parse(indexDoc["entry"][IndexCatalogType::kCollectionUUIDFieldName]));
- opCtx->recoveryUnit()->onCommit([opCtx, nss, indexVersion, indexEntry, uuid](auto _) {
- AutoGetCollection autoColl(opCtx, nss, MODE_IX);
- CollectionShardingRuntime::assertCollectionLockedAndAcquire(
- opCtx, nss, CSRAcquisitionMode::kExclusive)
- ->addIndex(opCtx, indexEntry, {uuid, indexVersion});
- });
- } else {
- auto indexName = indexDoc["entry"][IndexCatalogType::kNameFieldName].str();
- auto indexVersion = indexDoc["entry"][IndexCatalogType::kLastmodFieldName].timestamp();
- auto uuid = uassertStatusOK(
- UUID::parse(indexDoc["entry"][IndexCatalogType::kCollectionUUIDFieldName]));
- opCtx->recoveryUnit()->onCommit([opCtx, nss, indexName, indexVersion, uuid](auto _) {
- AutoGetCollection autoColl(opCtx, nss, MODE_IX);
- CollectionShardingRuntime::assertCollectionLockedAndAcquire(
- opCtx, nss, CSRAcquisitionMode::kExclusive)
- ->removeIndex(opCtx, indexName, {uuid, indexVersion});
- });
+ auto op = indexDoc["op"].String();
+ invariant(op.size() > 0);
+ switch (op[0]) {
+ case 'i': {
+ auto indexEntry = IndexCatalogType::parse(
+ IDLParserContext("onModifyShardedCollectionGlobalIndexCatalogEntry"),
+ indexDoc["entry"].Obj());
+ auto indexVersion = indexDoc["entry"][IndexCatalogType::kLastmodFieldName].timestamp();
+ auto uuid = uassertStatusOK(
+ UUID::parse(indexDoc["entry"][IndexCatalogType::kCollectionUUIDFieldName]));
+ opCtx->recoveryUnit()->onCommit([opCtx, nss, uuid, indexVersion, indexEntry](auto _) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ auto scsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx, nss, CSRAcquisitionMode::kExclusive);
+ scsr->addIndex(opCtx, indexEntry, {uuid, indexVersion});
+ });
+ break;
+ }
+ case 'd': {
+ auto indexName = indexDoc["entry"][IndexCatalogType::kNameFieldName].str();
+ auto indexVersion = indexDoc["entry"][IndexCatalogType::kLastmodFieldName].timestamp();
+ auto uuid = uassertStatusOK(
+ UUID::parse(indexDoc["entry"][IndexCatalogType::kCollectionUUIDFieldName]));
+ opCtx->recoveryUnit()->onCommit([opCtx, nss, indexName, indexVersion, uuid](auto _) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ auto scsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx, nss, CSRAcquisitionMode::kExclusive);
+ scsr->removeIndex(opCtx, indexName, {uuid, indexVersion});
+ });
+ break;
+ }
+ case 'r': {
+ std::vector<IndexCatalogType> indexes;
+ for (const auto& i : indexDoc["entry"]["i"].Array()) {
+ auto indexEntry = IndexCatalogType::parse(
+ IDLParserContext("onModifyShardedCollectionGlobalIndexCatalogEntry"), i.Obj());
+ indexes.push_back(indexEntry);
+ }
+
+ auto indexVersion = indexDoc["entry"][IndexCatalogType::kLastmodFieldName].timestamp();
+ auto uuid = uassertStatusOK(
+ UUID::parse(indexDoc["entry"][IndexCatalogType::kCollectionUUIDFieldName]));
+ opCtx->recoveryUnit()->onCommit([opCtx, nss, uuid, indexVersion, indexes](auto _) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ auto scsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx, nss, CSRAcquisitionMode::kExclusive);
+ scsr->replaceIndexes(opCtx, indexes, {uuid, indexVersion});
+ });
+ break;
+ }
+ case 'c':
+ opCtx->recoveryUnit()->onCommit([opCtx, nss](auto _) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ auto scsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx, nss, CSRAcquisitionMode::kExclusive);
+ scsr->clearIndexes(opCtx);
+ });
+
+ break;
+ default:
+ MONGO_UNREACHABLE;
}
}