diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/collmod_coordinator.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp | 106 | ||||
-rw-r--r-- | src/mongo/db/s/drop_collection_coordinator.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/drop_database_coordinator.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_util.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_util.h | 6 |
8 files changed, 131 insertions, 44 deletions
diff --git a/src/mongo/db/s/collmod_coordinator.cpp b/src/mongo/db/s/collmod_coordinator.cpp index ee319db4193..6b89d37435c 100644 --- a/src/mongo/db/s/collmod_coordinator.cpp +++ b/src/mongo/db/s/collmod_coordinator.cpp @@ -202,8 +202,11 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( if (_collInfo->isSharded) { _doc.setCollUUID( sharding_ddl_util::getCollectionUUID(opCtx, _collInfo->nsForTargeting)); - sharding_ddl_util::stopMigrations( - opCtx, _collInfo->nsForTargeting, _doc.getCollUUID()); + _updateSession(opCtx); + sharding_ddl_util::stopMigrations(opCtx, + _collInfo->nsForTargeting, + _doc.getCollUUID(), + getCurrentSession()); } })(); }) @@ -226,8 +229,11 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( if (!migrationsAlreadyBlockedForBucketNss) { _doc.setCollUUID(sharding_ddl_util::getCollectionUUID( opCtx, _collInfo->nsForTargeting, true /* allowViews */)); - sharding_ddl_util::stopMigrations( - opCtx, _collInfo->nsForTargeting, _doc.getCollUUID()); + _updateSession(opCtx); + sharding_ddl_util::stopMigrations(opCtx, + _collInfo->nsForTargeting, + _doc.getCollUUID(), + getCurrentSession()); } } @@ -374,12 +380,18 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( CommandHelpers::appendSimpleCommandStatus(builder, ok, errmsg); } _result = builder.obj(); - sharding_ddl_util::resumeMigrations( - opCtx, _collInfo->nsForTargeting, _doc.getCollUUID()); + _updateSession(opCtx); + sharding_ddl_util::resumeMigrations(opCtx, + _collInfo->nsForTargeting, + _doc.getCollUUID(), + getCurrentSession()); } catch (DBException& ex) { if (!_isRetriableErrorForDDLCoordinator(ex.toStatus())) { - sharding_ddl_util::resumeMigrations( - opCtx, _collInfo->nsForTargeting, _doc.getCollUUID()); + _updateSession(opCtx); + sharding_ddl_util::resumeMigrations(opCtx, + _collInfo->nsForTargeting, + _doc.getCollUUID(), + getCurrentSession()); } throw; } diff --git a/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp b/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp index 8d6ade7eeb4..870df0641da 100644 --- a/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp +++ b/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp @@ -108,6 +108,10 @@ public: AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kNever; } + + bool supportsRetryableWrite() const final { + return true; + } } configsvrSetAllowMigrationsCmd; } // namespace diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index de854d256ad..19806d4d8ec 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -47,6 +47,7 @@ #include "mongo/db/snapshot_window_options_gen.h" #include "mongo/db/transaction/transaction_api.h" #include "mongo/db/transaction/transaction_participant_gen.h" +#include "mongo/db/transaction/transaction_participant_resource_yielder.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -2178,14 +2179,13 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk( !collectionUUID || collectionUUID == cm.getUUID()); cm.getAllShardIds(&cmShardIds); - withTransaction( - opCtx, - CollectionType::ConfigNS, - [this, allowMigrations, &nss, &collectionUUID](OperationContext* opCtx, - TxnNumber txnNumber) { - // Update the 'allowMigrations' field. An unset 'allowMigrations' field implies - // 'true'. To ease backwards compatibility we omit 'allowMigrations' instead of - // setting it explicitly to 'true'. + + auto updateCollectionAndChunkFn = [allowMigrations, &nss, &collectionUUID]( + const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + write_ops::UpdateCommandRequest updateCollOp(CollectionType::ConfigNS); + updateCollOp.setUpdates([&] { + write_ops::UpdateOpEntry entry; const auto update = allowMigrations ? BSON("$unset" << BSON(CollectionType::kAllowMigrationsFieldName << "")) : BSON("$set" << BSON(CollectionType::kAllowMigrationsFieldName << false)); @@ -2195,25 +2195,81 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk( query = query.addFields(BSON(CollectionType::kUuidFieldName << *collectionUUID)); } + entry.setQ(query); + entry.setU(update); + entry.setMulti(false); + return std::vector<write_ops::UpdateOpEntry>{entry}; + }()); + + auto updateCollResponse = txnClient.runCRUDOpSync(updateCollOp, {0}); + uassertStatusOK(updateCollResponse.toStatus()); + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Expected to match one doc but matched " + << updateCollResponse.getNModified(), + updateCollResponse.getNModified() == 1); + + FindCommandRequest collQuery{CollectionType::ConfigNS}; + collQuery.setFilter(BSON(CollectionType::kNssFieldName << nss.ns())); + collQuery.setLimit(1); - const auto res = writeToConfigDocumentInTxn( - opCtx, - CollectionType::ConfigNS, - BatchedCommandRequest::buildUpdateOp(CollectionType::ConfigNS, - query, - update /* update */, - false /* upsert */, - false /* multi */), - txnNumber); - const auto numDocsModified = UpdateOp::parseResponse(res).getN(); - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Expected to match one doc for query " << query - << " but matched " << numDocsModified, - numDocsModified == 1); - - bumpCollectionMinorVersion(opCtx, _localConfigShard.get(), nss, txnNumber); - }); + const auto findCollResponse = txnClient.exhaustiveFindSync(collQuery); + uassert(ErrorCodes::NamespaceNotFound, + "Collection does not exist", + findCollResponse.size() == 1); + const CollectionType coll(findCollResponse[0]); + + // Find the newest chunk + FindCommandRequest chunkQuery{ChunkType::ConfigNS}; + chunkQuery.setFilter(BSON(ChunkType::collectionUUID << coll.getUuid())); + chunkQuery.setSort(BSON(ChunkType::lastmod << -1)); + chunkQuery.setLimit(1); + const auto findChunkResponse = txnClient.exhaustiveFindSync(chunkQuery); + + uassert(ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find max chunk version for collection " << nss.ns() + << ", but found no chunks", + findChunkResponse.size() == 1); + + const auto newestChunk = uassertStatusOK(ChunkType::parseFromConfigBSON( + findChunkResponse[0], coll.getEpoch(), coll.getTimestamp())); + const auto targetVersion = [&]() { + ChunkVersion version = newestChunk.getVersion(); + version.incMinor(); + return version; + }(); + + write_ops::UpdateCommandRequest updateChunkOp(ChunkType::ConfigNS); + BSONObjBuilder updateBuilder; + BSONObjBuilder updateVersionClause(updateBuilder.subobjStart("$set")); + updateVersionClause.appendTimestamp(ChunkType::lastmod(), targetVersion.toLong()); + updateVersionClause.doneFast(); + const auto update = updateBuilder.obj(); + updateChunkOp.setUpdates([&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(ChunkType::name << newestChunk.getName())); + entry.setU(update); + entry.setMulti(false); + entry.setUpsert(false); + return std::vector<write_ops::UpdateOpEntry>{entry}; + }()); + auto updateChunkResponse = txnClient.runCRUDOpSync(updateChunkOp, {1}); + uassertStatusOK(updateChunkResponse.toStatus()); + LOGV2_DEBUG( + 7353900, 1, "Finished all transaction operations in setAllowMigrations command"); + + return SemiFuture<void>::makeReady(); + }; + auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); + auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); + + txn_api::SyncTransactionWithRetries txn( + opCtx, + sleepInlineExecutor, + TransactionParticipantResourceYielder::make("setAllowMigrationsAndBumpOneChunk"), + inlineExecutor); + txn.run(opCtx, updateCollectionAndChunkFn); // From now on migrations are not allowed anymore, so it is not possible that new shards // will own chunks for this collection. } diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp index e8bc06a8a44..a073a6660c3 100644 --- a/src/mongo/db/s/drop_collection_coordinator.cpp +++ b/src/mongo/db/s/drop_collection_coordinator.cpp @@ -238,7 +238,10 @@ void DropCollectionCoordinator::_freezeMigrations( opCtx, "dropCollection.start", nss().ns(), logChangeDetail.obj()); if (_doc.getCollInfo()) { - sharding_ddl_util::stopMigrations(opCtx, nss(), _doc.getCollInfo()->getUuid()); + _updateSession(opCtx); + + sharding_ddl_util::stopMigrations( + opCtx, nss(), _doc.getCollInfo()->getUuid(), getCurrentSession()); } } diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp index 8947f7fd488..ef5ab472d4c 100644 --- a/src/mongo/db/s/drop_database_coordinator.cpp +++ b/src/mongo/db/s/drop_database_coordinator.cpp @@ -365,7 +365,9 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( const auto& nss = coll.getNss(); LOGV2_DEBUG(5494505, 2, "Dropping collection", logAttrs(nss)); - sharding_ddl_util::stopMigrations(opCtx, nss, coll.getUuid()); + _updateSession(opCtx); + sharding_ddl_util::stopMigrations( + opCtx, nss, coll.getUuid(), getCurrentSession()); auto newStateDoc = _doc; newStateDoc.setCollInfo(coll); diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp index 3f0ef0d988c..29ed595b6b4 100644 --- a/src/mongo/db/s/rename_collection_coordinator.cpp +++ b/src/mongo/db/s/rename_collection_coordinator.cpp @@ -333,11 +333,15 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( // Block migrations on involved sharded collections if (_doc.getOptShardedCollInfo()) { - sharding_ddl_util::stopMigrations(opCtx, fromNss, _doc.getSourceUUID()); + _updateSession(opCtx); + sharding_ddl_util::stopMigrations( + opCtx, fromNss, _doc.getSourceUUID(), getCurrentSession()); } if (_doc.getTargetIsSharded()) { - sharding_ddl_util::stopMigrations(opCtx, toNss, _doc.getTargetUUID()); + _updateSession(opCtx); + sharding_ddl_util::stopMigrations( + opCtx, toNss, _doc.getTargetUUID(), getCurrentSession()); } })) .then(_buildPhaseHandler( diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp index 37ee6e4cd02..80768da5afb 100644 --- a/src/mongo/db/s/sharding_ddl_util.cpp +++ b/src/mongo/db/s/sharding_ddl_util.cpp @@ -269,6 +269,7 @@ write_ops::UpdateCommandRequest buildNoopWriteRequestCommand() { void setAllowMigrations(OperationContext* opCtx, const NamespaceString& nss, const boost::optional<UUID>& expectedCollectionUUID, + const boost::optional<OperationSessionInfo>& osi, bool allowMigrations) { ConfigsvrSetAllowMigrations configsvrSetAllowMigrationsCmd(nss, allowMigrations); configsvrSetAllowMigrationsCmd.setCollectionUUID(expectedCollectionUUID); @@ -278,7 +279,8 @@ void setAllowMigrations(OperationContext* opCtx, opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, DatabaseName::kAdmin.toString(), - CommandHelpers::appendMajorityWriteConcern(configsvrSetAllowMigrationsCmd.toBSON({})), + CommandHelpers::appendMajorityWriteConcern( + configsvrSetAllowMigrationsCmd.toBSON(osi ? osi->toBSON() : BSONObj())), Shard::RetryPolicy::kIdempotent // Although ConfigsvrSetAllowMigrations is not really // idempotent (because it will cause the collection // version to be bumped), it is safe to be retried. @@ -768,14 +770,16 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded( void stopMigrations(OperationContext* opCtx, const NamespaceString& nss, - const boost::optional<UUID>& expectedCollectionUUID) { - setAllowMigrations(opCtx, nss, expectedCollectionUUID, false); + const boost::optional<UUID>& expectedCollectionUUID, + const boost::optional<OperationSessionInfo>& osi) { + setAllowMigrations(opCtx, nss, expectedCollectionUUID, osi, false); } void resumeMigrations(OperationContext* opCtx, const NamespaceString& nss, - const boost::optional<UUID>& expectedCollectionUUID) { - setAllowMigrations(opCtx, nss, expectedCollectionUUID, true); + const boost::optional<UUID>& expectedCollectionUUID, + const boost::optional<OperationSessionInfo>& osi) { + setAllowMigrations(opCtx, nss, expectedCollectionUUID, osi, true); } bool checkAllowMigrations(OperationContext* opCtx, const NamespaceString& nss) { diff --git a/src/mongo/db/s/sharding_ddl_util.h b/src/mongo/db/s/sharding_ddl_util.h index f0dd0a62f30..47de8ca794a 100644 --- a/src/mongo/db/s/sharding_ddl_util.h +++ b/src/mongo/db/s/sharding_ddl_util.h @@ -183,7 +183,8 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded( */ void stopMigrations(OperationContext* opCtx, const NamespaceString& nss, - const boost::optional<UUID>& expectedCollectionUUID); + const boost::optional<UUID>& expectedCollectionUUID, + const boost::optional<OperationSessionInfo>& osi = boost::none); /** * Resume migrations and balancing rounds for the given nss. @@ -192,7 +193,8 @@ void stopMigrations(OperationContext* opCtx, */ void resumeMigrations(OperationContext* opCtx, const NamespaceString& nss, - const boost::optional<UUID>& expectedCollectionUUID); + const boost::optional<UUID>& expectedCollectionUUID, + const boost::optional<OperationSessionInfo>& osi = boost::none); /** * Calls to the config server primary to get the collection document for the given nss. |