summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/collmod_coordinator.cpp28
-rw-r--r--src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp4
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp106
-rw-r--r--src/mongo/db/s/drop_collection_coordinator.cpp5
-rw-r--r--src/mongo/db/s/drop_database_coordinator.cpp4
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.cpp8
-rw-r--r--src/mongo/db/s/sharding_ddl_util.cpp14
-rw-r--r--src/mongo/db/s/sharding_ddl_util.h6
8 files changed, 131 insertions, 44 deletions
diff --git a/src/mongo/db/s/collmod_coordinator.cpp b/src/mongo/db/s/collmod_coordinator.cpp
index ee319db4193..6b89d37435c 100644
--- a/src/mongo/db/s/collmod_coordinator.cpp
+++ b/src/mongo/db/s/collmod_coordinator.cpp
@@ -202,8 +202,11 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
if (_collInfo->isSharded) {
_doc.setCollUUID(
sharding_ddl_util::getCollectionUUID(opCtx, _collInfo->nsForTargeting));
- sharding_ddl_util::stopMigrations(
- opCtx, _collInfo->nsForTargeting, _doc.getCollUUID());
+ _updateSession(opCtx);
+ sharding_ddl_util::stopMigrations(opCtx,
+ _collInfo->nsForTargeting,
+ _doc.getCollUUID(),
+ getCurrentSession());
}
})();
})
@@ -226,8 +229,11 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
if (!migrationsAlreadyBlockedForBucketNss) {
_doc.setCollUUID(sharding_ddl_util::getCollectionUUID(
opCtx, _collInfo->nsForTargeting, true /* allowViews */));
- sharding_ddl_util::stopMigrations(
- opCtx, _collInfo->nsForTargeting, _doc.getCollUUID());
+ _updateSession(opCtx);
+ sharding_ddl_util::stopMigrations(opCtx,
+ _collInfo->nsForTargeting,
+ _doc.getCollUUID(),
+ getCurrentSession());
}
}
@@ -374,12 +380,18 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
CommandHelpers::appendSimpleCommandStatus(builder, ok, errmsg);
}
_result = builder.obj();
- sharding_ddl_util::resumeMigrations(
- opCtx, _collInfo->nsForTargeting, _doc.getCollUUID());
+ _updateSession(opCtx);
+ sharding_ddl_util::resumeMigrations(opCtx,
+ _collInfo->nsForTargeting,
+ _doc.getCollUUID(),
+ getCurrentSession());
} catch (DBException& ex) {
if (!_isRetriableErrorForDDLCoordinator(ex.toStatus())) {
- sharding_ddl_util::resumeMigrations(
- opCtx, _collInfo->nsForTargeting, _doc.getCollUUID());
+ _updateSession(opCtx);
+ sharding_ddl_util::resumeMigrations(opCtx,
+ _collInfo->nsForTargeting,
+ _doc.getCollUUID(),
+ getCurrentSession());
}
throw;
}
diff --git a/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp b/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp
index 8d6ade7eeb4..870df0641da 100644
--- a/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp
+++ b/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp
@@ -108,6 +108,10 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
+
+ bool supportsRetryableWrite() const final {
+ return true;
+ }
} configsvrSetAllowMigrationsCmd;
} // namespace
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index de854d256ad..19806d4d8ec 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/snapshot_window_options_gen.h"
#include "mongo/db/transaction/transaction_api.h"
#include "mongo/db/transaction/transaction_participant_gen.h"
+#include "mongo/db/transaction/transaction_participant_resource_yielder.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -2178,14 +2179,13 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(
!collectionUUID || collectionUUID == cm.getUUID());
cm.getAllShardIds(&cmShardIds);
- withTransaction(
- opCtx,
- CollectionType::ConfigNS,
- [this, allowMigrations, &nss, &collectionUUID](OperationContext* opCtx,
- TxnNumber txnNumber) {
- // Update the 'allowMigrations' field. An unset 'allowMigrations' field implies
- // 'true'. To ease backwards compatibility we omit 'allowMigrations' instead of
- // setting it explicitly to 'true'.
+
+ auto updateCollectionAndChunkFn = [allowMigrations, &nss, &collectionUUID](
+ const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
+ write_ops::UpdateCommandRequest updateCollOp(CollectionType::ConfigNS);
+ updateCollOp.setUpdates([&] {
+ write_ops::UpdateOpEntry entry;
const auto update = allowMigrations
? BSON("$unset" << BSON(CollectionType::kAllowMigrationsFieldName << ""))
: BSON("$set" << BSON(CollectionType::kAllowMigrationsFieldName << false));
@@ -2195,25 +2195,81 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(
query =
query.addFields(BSON(CollectionType::kUuidFieldName << *collectionUUID));
}
+ entry.setQ(query);
+ entry.setU(update);
+ entry.setMulti(false);
+ return std::vector<write_ops::UpdateOpEntry>{entry};
+ }());
+
+ auto updateCollResponse = txnClient.runCRUDOpSync(updateCollOp, {0});
+ uassertStatusOK(updateCollResponse.toStatus());
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Expected to match one doc but matched "
+ << updateCollResponse.getNModified(),
+ updateCollResponse.getNModified() == 1);
+
+ FindCommandRequest collQuery{CollectionType::ConfigNS};
+ collQuery.setFilter(BSON(CollectionType::kNssFieldName << nss.ns()));
+ collQuery.setLimit(1);
- const auto res = writeToConfigDocumentInTxn(
- opCtx,
- CollectionType::ConfigNS,
- BatchedCommandRequest::buildUpdateOp(CollectionType::ConfigNS,
- query,
- update /* update */,
- false /* upsert */,
- false /* multi */),
- txnNumber);
- const auto numDocsModified = UpdateOp::parseResponse(res).getN();
- uassert(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Expected to match one doc for query " << query
- << " but matched " << numDocsModified,
- numDocsModified == 1);
-
- bumpCollectionMinorVersion(opCtx, _localConfigShard.get(), nss, txnNumber);
- });
+ const auto findCollResponse = txnClient.exhaustiveFindSync(collQuery);
+ uassert(ErrorCodes::NamespaceNotFound,
+ "Collection does not exist",
+ findCollResponse.size() == 1);
+ const CollectionType coll(findCollResponse[0]);
+
+ // Find the newest chunk
+ FindCommandRequest chunkQuery{ChunkType::ConfigNS};
+ chunkQuery.setFilter(BSON(ChunkType::collectionUUID << coll.getUuid()));
+ chunkQuery.setSort(BSON(ChunkType::lastmod << -1));
+ chunkQuery.setLimit(1);
+ const auto findChunkResponse = txnClient.exhaustiveFindSync(chunkQuery);
+
+ uassert(ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find max chunk version for collection " << nss.ns()
+ << ", but found no chunks",
+ findChunkResponse.size() == 1);
+
+ const auto newestChunk = uassertStatusOK(ChunkType::parseFromConfigBSON(
+ findChunkResponse[0], coll.getEpoch(), coll.getTimestamp()));
+ const auto targetVersion = [&]() {
+ ChunkVersion version = newestChunk.getVersion();
+ version.incMinor();
+ return version;
+ }();
+
+ write_ops::UpdateCommandRequest updateChunkOp(ChunkType::ConfigNS);
+ BSONObjBuilder updateBuilder;
+ BSONObjBuilder updateVersionClause(updateBuilder.subobjStart("$set"));
+ updateVersionClause.appendTimestamp(ChunkType::lastmod(), targetVersion.toLong());
+ updateVersionClause.doneFast();
+ const auto update = updateBuilder.obj();
+ updateChunkOp.setUpdates([&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(BSON(ChunkType::name << newestChunk.getName()));
+ entry.setU(update);
+ entry.setMulti(false);
+ entry.setUpsert(false);
+ return std::vector<write_ops::UpdateOpEntry>{entry};
+ }());
+ auto updateChunkResponse = txnClient.runCRUDOpSync(updateChunkOp, {1});
+ uassertStatusOK(updateChunkResponse.toStatus());
+ LOGV2_DEBUG(
+ 7353900, 1, "Finished all transaction operations in setAllowMigrations command");
+
+ return SemiFuture<void>::makeReady();
+ };
+ auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
+ auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
+
+ txn_api::SyncTransactionWithRetries txn(
+ opCtx,
+ sleepInlineExecutor,
+ TransactionParticipantResourceYielder::make("setAllowMigrationsAndBumpOneChunk"),
+ inlineExecutor);
+ txn.run(opCtx, updateCollectionAndChunkFn);
// From now on migrations are not allowed anymore, so it is not possible that new shards
// will own chunks for this collection.
}
diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp
index e8bc06a8a44..a073a6660c3 100644
--- a/src/mongo/db/s/drop_collection_coordinator.cpp
+++ b/src/mongo/db/s/drop_collection_coordinator.cpp
@@ -238,7 +238,10 @@ void DropCollectionCoordinator::_freezeMigrations(
opCtx, "dropCollection.start", nss().ns(), logChangeDetail.obj());
if (_doc.getCollInfo()) {
- sharding_ddl_util::stopMigrations(opCtx, nss(), _doc.getCollInfo()->getUuid());
+ _updateSession(opCtx);
+
+ sharding_ddl_util::stopMigrations(
+ opCtx, nss(), _doc.getCollInfo()->getUuid(), getCurrentSession());
}
}
diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp
index 8947f7fd488..ef5ab472d4c 100644
--- a/src/mongo/db/s/drop_database_coordinator.cpp
+++ b/src/mongo/db/s/drop_database_coordinator.cpp
@@ -365,7 +365,9 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
const auto& nss = coll.getNss();
LOGV2_DEBUG(5494505, 2, "Dropping collection", logAttrs(nss));
- sharding_ddl_util::stopMigrations(opCtx, nss, coll.getUuid());
+ _updateSession(opCtx);
+ sharding_ddl_util::stopMigrations(
+ opCtx, nss, coll.getUuid(), getCurrentSession());
auto newStateDoc = _doc;
newStateDoc.setCollInfo(coll);
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp
index 3f0ef0d988c..29ed595b6b4 100644
--- a/src/mongo/db/s/rename_collection_coordinator.cpp
+++ b/src/mongo/db/s/rename_collection_coordinator.cpp
@@ -333,11 +333,15 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
// Block migrations on involved sharded collections
if (_doc.getOptShardedCollInfo()) {
- sharding_ddl_util::stopMigrations(opCtx, fromNss, _doc.getSourceUUID());
+ _updateSession(opCtx);
+ sharding_ddl_util::stopMigrations(
+ opCtx, fromNss, _doc.getSourceUUID(), getCurrentSession());
}
if (_doc.getTargetIsSharded()) {
- sharding_ddl_util::stopMigrations(opCtx, toNss, _doc.getTargetUUID());
+ _updateSession(opCtx);
+ sharding_ddl_util::stopMigrations(
+ opCtx, toNss, _doc.getTargetUUID(), getCurrentSession());
}
}))
.then(_buildPhaseHandler(
diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp
index 37ee6e4cd02..80768da5afb 100644
--- a/src/mongo/db/s/sharding_ddl_util.cpp
+++ b/src/mongo/db/s/sharding_ddl_util.cpp
@@ -269,6 +269,7 @@ write_ops::UpdateCommandRequest buildNoopWriteRequestCommand() {
void setAllowMigrations(OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<UUID>& expectedCollectionUUID,
+ const boost::optional<OperationSessionInfo>& osi,
bool allowMigrations) {
ConfigsvrSetAllowMigrations configsvrSetAllowMigrationsCmd(nss, allowMigrations);
configsvrSetAllowMigrationsCmd.setCollectionUUID(expectedCollectionUUID);
@@ -278,7 +279,8 @@ void setAllowMigrations(OperationContext* opCtx,
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
DatabaseName::kAdmin.toString(),
- CommandHelpers::appendMajorityWriteConcern(configsvrSetAllowMigrationsCmd.toBSON({})),
+ CommandHelpers::appendMajorityWriteConcern(
+ configsvrSetAllowMigrationsCmd.toBSON(osi ? osi->toBSON() : BSONObj())),
Shard::RetryPolicy::kIdempotent // Although ConfigsvrSetAllowMigrations is not really
// idempotent (because it will cause the collection
// version to be bumped), it is safe to be retried.
@@ -768,14 +770,16 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded(
void stopMigrations(OperationContext* opCtx,
const NamespaceString& nss,
- const boost::optional<UUID>& expectedCollectionUUID) {
- setAllowMigrations(opCtx, nss, expectedCollectionUUID, false);
+ const boost::optional<UUID>& expectedCollectionUUID,
+ const boost::optional<OperationSessionInfo>& osi) {
+ setAllowMigrations(opCtx, nss, expectedCollectionUUID, osi, false);
}
void resumeMigrations(OperationContext* opCtx,
const NamespaceString& nss,
- const boost::optional<UUID>& expectedCollectionUUID) {
- setAllowMigrations(opCtx, nss, expectedCollectionUUID, true);
+ const boost::optional<UUID>& expectedCollectionUUID,
+ const boost::optional<OperationSessionInfo>& osi) {
+ setAllowMigrations(opCtx, nss, expectedCollectionUUID, osi, true);
}
bool checkAllowMigrations(OperationContext* opCtx, const NamespaceString& nss) {
diff --git a/src/mongo/db/s/sharding_ddl_util.h b/src/mongo/db/s/sharding_ddl_util.h
index f0dd0a62f30..47de8ca794a 100644
--- a/src/mongo/db/s/sharding_ddl_util.h
+++ b/src/mongo/db/s/sharding_ddl_util.h
@@ -183,7 +183,8 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded(
*/
void stopMigrations(OperationContext* opCtx,
const NamespaceString& nss,
- const boost::optional<UUID>& expectedCollectionUUID);
+ const boost::optional<UUID>& expectedCollectionUUID,
+ const boost::optional<OperationSessionInfo>& osi = boost::none);
/**
* Resume migrations and balancing rounds for the given nss.
@@ -192,7 +193,8 @@ void stopMigrations(OperationContext* opCtx,
*/
void resumeMigrations(OperationContext* opCtx,
const NamespaceString& nss,
- const boost::optional<UUID>& expectedCollectionUUID);
+ const boost::optional<UUID>& expectedCollectionUUID,
+ const boost::optional<OperationSessionInfo>& osi = boost::none);
/**
* Calls to the config server primary to get the collection document for the given nss.