diff options
author | auto-revert-processor <dev-prod-dag@mongodb.com> | 2023-05-12 23:41:11 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-13 01:34:46 +0000 |
commit | 68145fa6580490c6d6d1e3829359f3260be8f9bc (patch) | |
tree | d3a6f26d3772b61b7adc28c59ee044061cedb7e1 | |
parent | cc09debe1f3eacece6986f8bb5fd3199600e54c6 (diff) | |
download | mongo-68145fa6580490c6d6d1e3829359f3260be8f9bc.tar.gz |
Revert "SERVER-76050 Enhance session handling in each coordinator"
This reverts commit 5a8c52c0de24d8ddeb4a7d3b90ef5942c53cbd14.
-rw-r--r-- | src/mongo/db/s/collmod_coordinator.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/s/drop_collection_coordinator.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/s/drop_database_coordinator.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.h | 77 |
6 files changed, 130 insertions, 88 deletions
diff --git a/src/mongo/db/s/collmod_coordinator.cpp b/src/mongo/db/s/collmod_coordinator.cpp index be095f23f0a..12b443a6bd2 100644 --- a/src/mongo/db/s/collmod_coordinator.cpp +++ b/src/mongo/db/s/collmod_coordinator.cpp @@ -134,8 +134,9 @@ void CollModCoordinator::_performNoopRetryableWriteOnParticipants( return participants; }(); + _updateSession(opCtx); sharding_ddl_util::performNoopRetryableWriteOnShards( - opCtx, shardsAndConfigsvr, getNewSession(opCtx), executor); + opCtx, shardsAndConfigsvr, getCurrentSession(), executor); } void CollModCoordinator::_saveCollectionInfoOnCoordinatorIfNecessary(OperationContext* opCtx) { @@ -215,10 +216,11 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( if (_collInfo->isSharded) { _doc.setCollUUID( sharding_ddl_util::getCollectionUUID(opCtx, _collInfo->nsForTargeting)); + _updateSession(opCtx); sharding_ddl_util::stopMigrations(opCtx, _collInfo->nsForTargeting, _doc.getCollUUID(), - getNewSession(opCtx)); + getCurrentSession()); } })(); }) @@ -229,6 +231,8 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + _updateSession(opCtx); + _saveCollectionInfoOnCoordinatorIfNecessary(opCtx); if (_isPre61Compatible() && _collInfo->isSharded) { @@ -239,10 +243,11 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( if (!migrationsAlreadyBlockedForBucketNss) { _doc.setCollUUID(sharding_ddl_util::getCollectionUUID( opCtx, _collInfo->nsForTargeting, true /* allowViews */)); + _updateSession(opCtx); sharding_ddl_util::stopMigrations(opCtx, _collInfo->nsForTargeting, _doc.getCollUUID(), - getNewSession(opCtx)); + getCurrentSession()); } } @@ -255,6 +260,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( _updateStateDocument(opCtx, std::move(newDoc)); } + _updateSession(opCtx); ShardsvrParticipantBlock blockCRUDOperationsRequest(_collInfo->nsForTargeting); blockCRUDOperationsRequest.setBlockType( CriticalSectionBlockTypeEnum::kReadsAndWrites); @@ -263,7 +269,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( blockCRUDOperationsRequest.toBSON({}), _shardingInfo->shardsOwningChunks, **executor, - getNewSession(opCtx)); + getCurrentSession()); } })) .then(_buildPhaseHandler( @@ -275,6 +281,8 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + _updateSession(opCtx); + _saveCollectionInfoOnCoordinatorIfNecessary(opCtx); _saveShardingInfoOnCoordinatorIfNecessary(opCtx); @@ -289,7 +297,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( configShard->runCommand(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), nss().db().toString(), - cmdObj.addFields(getNewSession(opCtx).toBSON()), + cmdObj, Shard::RetryPolicy::kIdempotent))); } })) @@ -299,6 +307,8 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + _updateSession(opCtx); + _saveCollectionInfoOnCoordinatorIfNecessary(opCtx); _saveShardingInfoOnCoordinatorIfNecessary(opCtx); @@ -357,6 +367,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( // A view definition will only be present on the primary shard. So we pass // an addition 'performViewChange' flag only to the primary shard. if (primaryShardOwningChunk != shardsOwningChunks.end()) { + _updateSession(opCtx); request.setPerformViewChange(true); const auto& primaryResponse = sendAuthenticatedCommandWithOsiToShards( opCtx, @@ -364,12 +375,13 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( request.toBSON({}), {_shardingInfo->primaryShard}, **executor, - getNewSession(opCtx)); + getCurrentSession()); responses.insert( responses.end(), primaryResponse.begin(), primaryResponse.end()); shardsOwningChunks.erase(primaryShardOwningChunk); } + _updateSession(opCtx); request.setPerformViewChange(false); const auto& secondaryResponses = sendAuthenticatedCommandWithOsiToShards(opCtx, @@ -377,7 +389,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( request.toBSON({}), shardsOwningChunks, **executor, - getNewSession(opCtx)); + getCurrentSession()); responses.insert( responses.end(), secondaryResponses.begin(), secondaryResponses.end()); @@ -389,16 +401,18 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( CommandHelpers::appendSimpleCommandStatus(builder, ok, errmsg); } _result = builder.obj(); + _updateSession(opCtx); sharding_ddl_util::resumeMigrations(opCtx, _collInfo->nsForTargeting, _doc.getCollUUID(), - getNewSession(opCtx)); + getCurrentSession()); } catch (DBException& ex) { if (!_isRetriableErrorForDDLCoordinator(ex.toStatus())) { + _updateSession(opCtx); sharding_ddl_util::resumeMigrations(opCtx, _collInfo->nsForTargeting, _doc.getCollUUID(), - getNewSession(opCtx)); + getCurrentSession()); } throw; } diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index 987e7ee1df2..78af6bd3fc4 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -474,8 +474,9 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // Additionally we want to perform a majority write on the CSRS to ensure that // all the subsequent reads will see all the writes performed from a previous // execution of this coordinator. + _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getNewSession(opCtx), **executor); + opCtx, getCurrentSession(), **executor); if (_timeseriesNssResolvedByCommandHandler() || _doc.getTranslatedRequestParams()) { @@ -533,10 +534,12 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( "Removing partial changes from previous run", logAttrs(nss())); + _updateSession(opCtx); cleanupPartialChunksFromPreviousAttempt( - opCtx, *uuid, getNewSession(opCtx)); + opCtx, *uuid, getCurrentSession()); - broadcastDropCollection(opCtx, nss(), **executor, getNewSession(opCtx)); + _updateSession(opCtx); + broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession()); } } } @@ -593,7 +596,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // shard _promoteCriticalSectionsToBlockReads(opCtx); - _createCollectionOnNonPrimaryShards(opCtx, getNewSession(opCtx)); + _updateSession(opCtx); + _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession()); _commit(opCtx, **executor); } @@ -1196,7 +1200,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx, } // Upsert Chunks. - insertChunks(opCtx, _initialChunks->chunks, getNewSession(opCtx)); + _updateSession(opCtx); + insertChunks(opCtx, _initialChunks->chunks, getCurrentSession()); // The coll and shardsHoldingData objects will be used by both this function and // insertCollectionAndPlacementEntries(), which accesses their content from a separate thread @@ -1235,7 +1240,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx, coll->setUnique(*_request.getUnique()); } - const auto& osi = getNewSession(opCtx); + _updateSession(opCtx); try { notifyChangeStreamsOnShardCollection(opCtx, nss(), @@ -1245,7 +1250,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx, *shardsHoldingData); insertCollectionAndPlacementEntries( - opCtx, executor, coll, placementVersion, shardsHoldingData, osi); + opCtx, executor, coll, placementVersion, shardsHoldingData, getCurrentSession()); notifyChangeStreamsOnShardCollection( opCtx, nss(), *_collectionUUID, _request.toBSON(), CommitPhase::kSuccessful); diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp index 2452701358b..333657bc8ee 100644 --- a/src/mongo/db/s/drop_collection_coordinator.cpp +++ b/src/mongo/db/s/drop_collection_coordinator.cpp @@ -251,8 +251,10 @@ void DropCollectionCoordinator::_freezeMigrations( opCtx, "dropCollection.start", nss().ns(), logChangeDetail.obj()); if (_doc.getCollInfo()) { + _updateSession(opCtx); + sharding_ddl_util::stopMigrations( - opCtx, nss(), _doc.getCollInfo()->getUuid(), getNewSession(opCtx)); + opCtx, nss(), _doc.getCollInfo()->getUuid(), getCurrentSession()); } } @@ -264,6 +266,7 @@ void DropCollectionCoordinator::_enterCriticalSection( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + _updateSession(opCtx); ShardsvrParticipantBlock blockCRUDOperationsRequest(nss()); blockCRUDOperationsRequest.setBlockType(mongo::CriticalSectionBlockTypeEnum::kReadsAndWrites); blockCRUDOperationsRequest.setReason(_critSecReason); @@ -274,7 +277,7 @@ void DropCollectionCoordinator::_enterCriticalSection( sharding_ddl_util::sendAuthenticatedCommandToShards( opCtx, nss().db(), - cmdObj.addFields(getNewSession(opCtx).toBSON()), + cmdObj.addFields(getCurrentSession().toBSON()), Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx), **executor); @@ -295,6 +298,7 @@ void DropCollectionCoordinator::_commitDropCollection( sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig( opCtx, BSON(analyze_shard_key::QueryAnalyzerDocument::kNsFieldName << nss().toString())); + _updateSession(opCtx); if (collIsSharded) { invariant(_doc.getCollInfo()); const auto& coll = _doc.getCollInfo().value(); @@ -308,15 +312,18 @@ void DropCollectionCoordinator::_commitDropCollection( Grid::get(opCtx)->catalogClient(), coll, ShardingCatalogClient::kMajorityWriteConcern, - getNewSession(opCtx), + getCurrentSession(), useClusterTransaction, **executor); } // Remove tags even if the collection is not sharded or didn't exist - sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss(), getNewSession(opCtx)); + _updateSession(opCtx); + sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss(), getCurrentSession()); + + // get a Lsid and an incremented txnNumber. Ensures we are the primary + _updateSession(opCtx); - // Ensures we are the primary const auto primaryShardId = ShardingState::get(opCtx)->shardId(); // We need to send the drop to all the shards because both movePrimary and @@ -327,13 +334,13 @@ void DropCollectionCoordinator::_commitDropCollection( participants.end()); sharding_ddl_util::sendDropCollectionParticipantCommandToShards( - opCtx, nss(), participants, **executor, getNewSession(opCtx), true /*fromMigrate*/); + opCtx, nss(), participants, **executor, getCurrentSession(), true /*fromMigrate*/); // The sharded collection must be dropped on the primary shard after it has been // dropped on all of the other shards to ensure it can only be re-created as // unsharded with a higher optime than all of the drops. sharding_ddl_util::sendDropCollectionParticipantCommandToShards( - opCtx, nss(), {primaryShardId}, **executor, getNewSession(opCtx), false /*fromMigrate*/); + opCtx, nss(), {primaryShardId}, **executor, getCurrentSession(), false /*fromMigrate*/); ShardingLogging::get(opCtx)->logChange(opCtx, "dropCollection", nss().ns()); LOGV2(5390503, "Collection dropped", logAttrs(nss())); @@ -347,6 +354,7 @@ void DropCollectionCoordinator::_exitCriticalSection( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + _updateSession(opCtx); ShardsvrParticipantBlock unblockCRUDOperationsRequest(nss()); unblockCRUDOperationsRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock); unblockCRUDOperationsRequest.setReason(_critSecReason); @@ -357,7 +365,7 @@ void DropCollectionCoordinator::_exitCriticalSection( sharding_ddl_util::sendAuthenticatedCommandToShards( opCtx, nss().db(), - cmdObj.addFields(getNewSession(opCtx).toBSON()), + cmdObj.addFields(getCurrentSession().toBSON()), Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx), **executor); diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp index 1d80d487a90..ef5ab472d4c 100644 --- a/src/mongo/db/s/drop_database_coordinator.cpp +++ b/src/mongo/db/s/drop_database_coordinator.cpp @@ -208,6 +208,7 @@ void DropDatabaseCoordinator::_dropShardedCollection( opCtx, nss.ns(), coorName, DDLLockManager::kDefaultLockTimeout); if (!_isPre70Compatible()) { + _updateSession(opCtx); ShardsvrParticipantBlock blockCRUDOperationsRequest(nss); blockCRUDOperationsRequest.setBlockType( mongo::CriticalSectionBlockTypeEnum::kReadsAndWrites); @@ -218,11 +219,13 @@ void DropDatabaseCoordinator::_dropShardedCollection( sharding_ddl_util::sendAuthenticatedCommandToShards( opCtx, nss.db(), - cmdObj.addFields(getNewSession(opCtx).toBSON()), + cmdObj.addFields(getCurrentSession().toBSON()), Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx), **executor); } + _updateSession(opCtx); + // This always runs in the shard role so should use a cluster transaction to guarantee // targeting the config server. bool useClusterTransaction = true; @@ -232,13 +235,15 @@ void DropDatabaseCoordinator::_dropShardedCollection( Grid::get(opCtx)->catalogClient(), coll, ShardingCatalogClient::kMajorityWriteConcern, - getNewSession(opCtx), + getCurrentSession(), useClusterTransaction, **executor); - sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getNewSession(opCtx)); + _updateSession(opCtx); + sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getCurrentSession()); const auto primaryShardId = ShardingState::get(opCtx)->shardId(); + _updateSession(opCtx); // We need to send the drop to all the shards because both movePrimary and // moveChunk leave garbage behind for sharded collections. @@ -247,15 +252,16 @@ void DropDatabaseCoordinator::_dropShardedCollection( participants.erase(std::remove(participants.begin(), participants.end(), primaryShardId), participants.end()); sharding_ddl_util::sendDropCollectionParticipantCommandToShards( - opCtx, nss, participants, **executor, getNewSession(opCtx), true /* fromMigrate */); + opCtx, nss, participants, **executor, getCurrentSession(), true /* fromMigrate */); // The sharded collection must be dropped on the primary shard after it has been dropped on all // of the other shards to ensure it can only be re-created as unsharded with a higher optime // than all of the drops. sharding_ddl_util::sendDropCollectionParticipantCommandToShards( - opCtx, nss, {primaryShardId}, **executor, getNewSession(opCtx), false /* fromMigrate */); + opCtx, nss, {primaryShardId}, **executor, getCurrentSession(), false /* fromMigrate */); if (!_isPre70Compatible()) { + _updateSession(opCtx); ShardsvrParticipantBlock unblockCRUDOperationsRequest(nss); unblockCRUDOperationsRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock); unblockCRUDOperationsRequest.setReason(getReasonForDropCollection(nss)); @@ -266,7 +272,7 @@ void DropDatabaseCoordinator::_dropShardedCollection( sharding_ddl_util::sendAuthenticatedCommandToShards( opCtx, nss.db(), - cmdObj.addFields(getNewSession(opCtx).toBSON()), + cmdObj.addFields(getCurrentSession().toBSON()), Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx), **executor); } @@ -313,8 +319,9 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( // Perform a noop write on the participants in order to advance the txnNumber // for this coordinator's lsid so that requests with older txnNumbers can no // longer execute. + _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getNewSession(opCtx), **executor); + opCtx, getCurrentSession(), **executor); } ShardingLogging::get(opCtx)->logChange(opCtx, "dropDatabase.start", _dbName); @@ -358,8 +365,9 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( const auto& nss = coll.getNss(); LOGV2_DEBUG(5494505, 2, "Dropping collection", logAttrs(nss)); + _updateSession(opCtx); sharding_ddl_util::stopMigrations( - opCtx, nss, coll.getUuid(), getNewSession(opCtx)); + opCtx, nss, coll.getUuid(), getCurrentSession()); auto newStateDoc = _doc; newStateDoc.setCollInfo(coll); @@ -376,8 +384,9 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( const auto& nssWithZones = catalogClient->getAllNssThatHaveZonesForDatabase(opCtx, _dbName); for (const auto& nss : nssWithZones) { + _updateSession(opCtx); sharding_ddl_util::removeTagsMetadataFromConfig( - opCtx, nss, getNewSession(opCtx)); + opCtx, nss, getCurrentSession()); } // Remove the query sampling configuration documents for all collections in this @@ -461,12 +470,13 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( _clearDatabaseInfoOnPrimary(opCtx); _clearDatabaseInfoOnSecondaries(opCtx); + _updateSession(opCtx); removeDatabaseFromConfigAndUpdatePlacementHistory( opCtx, **executor, _dbName, *metadata().getDatabaseVersion(), - getNewSession(opCtx)); + getCurrentSession()); VectorClockMutable::get(opCtx)->waitForDurableConfigTime().get(opCtx); } diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp index ec14848dfdd..8dd593b44e7 100644 --- a/src/mongo/db/s/rename_collection_coordinator.cpp +++ b/src/mongo/db/s/rename_collection_coordinator.cpp @@ -654,13 +654,15 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( // Block migrations on involved sharded collections if (_doc.getOptShardedCollInfo()) { + _updateSession(opCtx); sharding_ddl_util::stopMigrations( - opCtx, fromNss, _doc.getSourceUUID(), getNewSession(opCtx)); + opCtx, fromNss, _doc.getSourceUUID(), getCurrentSession()); } if (_doc.getTargetIsSharded()) { + _updateSession(opCtx); sharding_ddl_util::stopMigrations( - opCtx, toNss, _doc.getTargetUUID(), getNewSession(opCtx)); + opCtx, toNss, _doc.getTargetUUID(), getCurrentSession()); } })) .then(_buildPhaseHandler( @@ -671,12 +673,16 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( getForwardableOpMetadata().setOn(opCtx); if (!_firstExecution) { + _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getNewSession(opCtx), **executor); + opCtx, getCurrentSession(), **executor); } const auto& fromNss = nss(); + _updateSession(opCtx); + const OperationSessionInfo osi = getCurrentSession(); + // On participant shards: // - Block CRUD on source and target collection in case at least one of such // collections is currently sharded @@ -689,7 +695,7 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( renameCollParticipantRequest.setRenameCollectionRequest(_request); const auto cmdObj = CommandHelpers::appendMajorityWriteConcern( renameCollParticipantRequest.toBSON({})) - .addFields(getNewSession(opCtx).toBSON()); + .addFields(osi.toBSON()); // We need to send the command to all the shards because both movePrimary and // moveChunk leave garbage behind for sharded collections. At the same time, the @@ -726,23 +732,27 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( // For an unsharded collection the CSRS server can not verify the targetUUID. // Use the session ID + txnNumber to ensure no stale requests get through. + _updateSession(opCtx); + if (!_firstExecution) { _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getNewSession(opCtx), **executor); + opCtx, getCurrentSession(), **executor); } if ((_doc.getTargetIsSharded() || _doc.getOptShardedCollInfo())) { renameIndexMetadataInShards( - opCtx, nss(), _request, getNewSession(opCtx), **executor, &_doc); + opCtx, nss(), _request, getCurrentSession(), **executor, &_doc); } + _updateSession(opCtx); + renameCollectionMetadataInTransaction(opCtx, _doc.getOptShardedCollInfo(), _request.getTo(), _doc.getTargetUUID(), ShardingCatalogClient::kMajorityWriteConcern, **executor, - getNewSession(opCtx)); + getCurrentSession()); })) .then(_buildPhaseHandler( Phase::kUnblockCRUD, @@ -752,8 +762,9 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( getForwardableOpMetadata().setOn(opCtx); if (!_firstExecution) { + _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getNewSession(opCtx), **executor); + opCtx, getCurrentSession(), **executor); } const auto& fromNss = nss(); @@ -767,12 +778,11 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( unblockParticipantRequest.toBSON({})); auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); + _updateSession(opCtx); + const OperationSessionInfo osi = getCurrentSession(); + sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, - fromNss.db(), - cmdObj.addFields(getNewSession(opCtx).toBSON()), - participants, - **executor); + opCtx, fromNss.db(), cmdObj.addFields(osi.toBSON()), participants, **executor); // Delete chunks belonging to the previous incarnation of the target collection. // This is performed after releasing the critical section in order to reduce stalls diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index 267b3161b04..db0827d1142 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -372,47 +372,6 @@ protected: } } - /** - * Advances and persists the `txnNumber` to ensure causality between requests, then returns the - * updated operation session information (OSI). - */ - OperationSessionInfo getNewSession(OperationContext* opCtx) { - _updateSession(opCtx); - return getCurrentSession(); - } - - virtual boost::optional<Status> getAbortReason() const override { - const auto& status = _doc.getAbortReason(); - invariant(!status || !status->isOK(), "when persisted, status must be an error"); - return status; - } - - /** - * Persists the abort reason and throws it as an exception. This causes the coordinator to fail, - * and triggers the cleanup future chain since there is a the persisted reason. - */ - void triggerCleanup(OperationContext* opCtx, const Status& status) { - LOGV2_INFO(7418502, - "Coordinator failed, persisting abort reason", - "coordinatorId"_attr = _doc.getId(), - "phase"_attr = serializePhase(_doc.getPhase()), - "reason"_attr = redact(status)); - - auto newDoc = [&] { - stdx::lock_guard lk{_docMutex}; - return _doc; - }(); - - auto coordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata(); - coordinatorMetadata.setAbortReason(status); - newDoc.setShardingDDLCoordinatorMetadata(std::move(coordinatorMetadata)); - - _updateStateDocument(opCtx, std::move(newDoc)); - - uassertStatusOK(status); - } - -private: // lazily acquire Logical Session ID and a txn number void _updateSession(OperationContext* opCtx) { auto newDoc = [&] { @@ -449,6 +408,42 @@ private: osi.setTxnNumber(optSession->getTxnNumber()); return osi; } + + OperationSessionInfo getNewSession(OperationContext* opCtx) { + _updateSession(opCtx); + return getCurrentSession(); + } + + virtual boost::optional<Status> getAbortReason() const override { + const auto& status = _doc.getAbortReason(); + invariant(!status || !status->isOK(), "when persisted, status must be an error"); + return status; + } + + /** + * Persists the abort reason and throws it as an exception. This causes the coordinator to fail, + * and triggers the cleanup future chain since there is a the persisted reason. + */ + void triggerCleanup(OperationContext* opCtx, const Status& status) { + LOGV2_INFO(7418502, + "Coordinator failed, persisting abort reason", + "coordinatorId"_attr = _doc.getId(), + "phase"_attr = serializePhase(_doc.getPhase()), + "reason"_attr = redact(status)); + + auto newDoc = [&] { + stdx::lock_guard lk{_docMutex}; + return _doc; + }(); + + auto coordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata(); + coordinatorMetadata.setAbortReason(status); + newDoc.setShardingDDLCoordinatorMetadata(std::move(coordinatorMetadata)); + + _updateStateDocument(opCtx, std::move(newDoc)); + + uassertStatusOK(status); + } }; #undef MONGO_LOGV2_DEFAULT_COMPONENT |