diff options
author | Silvia Surroca <silvia.surroca@mongodb.com> | 2023-05-12 14:48:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-12 16:16:19 +0000 |
commit | 5a8c52c0de24d8ddeb4a7d3b90ef5942c53cbd14 (patch) | |
tree | c136bac4e319737bd638077c7a2a2a09bbcd9096 /src/mongo/db | |
parent | 1c5afb84c8b935fc1246fb3a689a01f22ed0fafd (diff) | |
download | mongo-5a8c52c0de24d8ddeb4a7d3b90ef5942c53cbd14.tar.gz |
SERVER-76050 Enhance session handling in each coordinator
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/collmod_coordinator.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/s/drop_collection_coordinator.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/s/drop_database_coordinator.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.h | 77 |
6 files changed, 88 insertions, 130 deletions
diff --git a/src/mongo/db/s/collmod_coordinator.cpp b/src/mongo/db/s/collmod_coordinator.cpp index 12b443a6bd2..be095f23f0a 100644 --- a/src/mongo/db/s/collmod_coordinator.cpp +++ b/src/mongo/db/s/collmod_coordinator.cpp @@ -134,9 +134,8 @@ void CollModCoordinator::_performNoopRetryableWriteOnParticipants( return participants; }(); - _updateSession(opCtx); sharding_ddl_util::performNoopRetryableWriteOnShards( - opCtx, shardsAndConfigsvr, getCurrentSession(), executor); + opCtx, shardsAndConfigsvr, getNewSession(opCtx), executor); } void CollModCoordinator::_saveCollectionInfoOnCoordinatorIfNecessary(OperationContext* opCtx) { @@ -216,11 +215,10 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( if (_collInfo->isSharded) { _doc.setCollUUID( sharding_ddl_util::getCollectionUUID(opCtx, _collInfo->nsForTargeting)); - _updateSession(opCtx); sharding_ddl_util::stopMigrations(opCtx, _collInfo->nsForTargeting, _doc.getCollUUID(), - getCurrentSession()); + getNewSession(opCtx)); } })(); }) @@ -231,8 +229,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); - _updateSession(opCtx); - _saveCollectionInfoOnCoordinatorIfNecessary(opCtx); if (_isPre61Compatible() && _collInfo->isSharded) { @@ -243,11 +239,10 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( if (!migrationsAlreadyBlockedForBucketNss) { _doc.setCollUUID(sharding_ddl_util::getCollectionUUID( opCtx, _collInfo->nsForTargeting, true /* allowViews */)); - _updateSession(opCtx); sharding_ddl_util::stopMigrations(opCtx, _collInfo->nsForTargeting, _doc.getCollUUID(), - getCurrentSession()); + getNewSession(opCtx)); } } @@ -260,7 +255,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( _updateStateDocument(opCtx, std::move(newDoc)); } - _updateSession(opCtx); ShardsvrParticipantBlock blockCRUDOperationsRequest(_collInfo->nsForTargeting); blockCRUDOperationsRequest.setBlockType( CriticalSectionBlockTypeEnum::kReadsAndWrites); @@ -269,7 +263,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( blockCRUDOperationsRequest.toBSON({}), _shardingInfo->shardsOwningChunks, **executor, - getCurrentSession()); + getNewSession(opCtx)); } })) .then(_buildPhaseHandler( @@ -281,8 +275,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); - _updateSession(opCtx); - _saveCollectionInfoOnCoordinatorIfNecessary(opCtx); _saveShardingInfoOnCoordinatorIfNecessary(opCtx); @@ -297,7 +289,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( configShard->runCommand(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), nss().db().toString(), - cmdObj, + cmdObj.addFields(getNewSession(opCtx).toBSON()), Shard::RetryPolicy::kIdempotent))); } })) @@ -307,8 +299,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); - _updateSession(opCtx); - _saveCollectionInfoOnCoordinatorIfNecessary(opCtx); _saveShardingInfoOnCoordinatorIfNecessary(opCtx); @@ -367,7 +357,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( // A view definition will only be present on the primary shard. So we pass // an addition 'performViewChange' flag only to the primary shard. if (primaryShardOwningChunk != shardsOwningChunks.end()) { - _updateSession(opCtx); request.setPerformViewChange(true); const auto& primaryResponse = sendAuthenticatedCommandWithOsiToShards( opCtx, @@ -375,13 +364,12 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( request.toBSON({}), {_shardingInfo->primaryShard}, **executor, - getCurrentSession()); + getNewSession(opCtx)); responses.insert( responses.end(), primaryResponse.begin(), primaryResponse.end()); shardsOwningChunks.erase(primaryShardOwningChunk); } - _updateSession(opCtx); request.setPerformViewChange(false); const auto& secondaryResponses = sendAuthenticatedCommandWithOsiToShards(opCtx, @@ -389,7 +377,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( request.toBSON({}), shardsOwningChunks, **executor, - getCurrentSession()); + getNewSession(opCtx)); responses.insert( responses.end(), secondaryResponses.begin(), secondaryResponses.end()); @@ -401,18 +389,16 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( CommandHelpers::appendSimpleCommandStatus(builder, ok, errmsg); } _result = builder.obj(); - _updateSession(opCtx); sharding_ddl_util::resumeMigrations(opCtx, _collInfo->nsForTargeting, _doc.getCollUUID(), - getCurrentSession()); + getNewSession(opCtx)); } catch (DBException& ex) { if (!_isRetriableErrorForDDLCoordinator(ex.toStatus())) { - _updateSession(opCtx); sharding_ddl_util::resumeMigrations(opCtx, _collInfo->nsForTargeting, _doc.getCollUUID(), - getCurrentSession()); + getNewSession(opCtx)); } throw; } diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index 78af6bd3fc4..987e7ee1df2 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -474,9 +474,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // Additionally we want to perform a majority write on the CSRS to ensure that // all the subsequent reads will see all the writes performed from a previous // execution of this coordinator. - _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getCurrentSession(), **executor); + opCtx, getNewSession(opCtx), **executor); if (_timeseriesNssResolvedByCommandHandler() || _doc.getTranslatedRequestParams()) { @@ -534,12 +533,10 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( "Removing partial changes from previous run", logAttrs(nss())); - _updateSession(opCtx); cleanupPartialChunksFromPreviousAttempt( - opCtx, *uuid, getCurrentSession()); + opCtx, *uuid, getNewSession(opCtx)); - _updateSession(opCtx); - broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession()); + broadcastDropCollection(opCtx, nss(), **executor, getNewSession(opCtx)); } } } @@ -596,8 +593,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // shard _promoteCriticalSectionsToBlockReads(opCtx); - _updateSession(opCtx); - _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession()); + _createCollectionOnNonPrimaryShards(opCtx, getNewSession(opCtx)); _commit(opCtx, **executor); } @@ -1200,8 +1196,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx, } // Upsert Chunks. - _updateSession(opCtx); - insertChunks(opCtx, _initialChunks->chunks, getCurrentSession()); + insertChunks(opCtx, _initialChunks->chunks, getNewSession(opCtx)); // The coll and shardsHoldingData objects will be used by both this function and // insertCollectionAndPlacementEntries(), which accesses their content from a separate thread @@ -1240,7 +1235,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx, coll->setUnique(*_request.getUnique()); } - _updateSession(opCtx); + const auto& osi = getNewSession(opCtx); try { notifyChangeStreamsOnShardCollection(opCtx, nss(), @@ -1250,7 +1245,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx, *shardsHoldingData); insertCollectionAndPlacementEntries( - opCtx, executor, coll, placementVersion, shardsHoldingData, getCurrentSession()); + opCtx, executor, coll, placementVersion, shardsHoldingData, osi); notifyChangeStreamsOnShardCollection( opCtx, nss(), *_collectionUUID, _request.toBSON(), CommitPhase::kSuccessful); diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp index 333657bc8ee..2452701358b 100644 --- a/src/mongo/db/s/drop_collection_coordinator.cpp +++ b/src/mongo/db/s/drop_collection_coordinator.cpp @@ -251,10 +251,8 @@ void DropCollectionCoordinator::_freezeMigrations( opCtx, "dropCollection.start", nss().ns(), logChangeDetail.obj()); if (_doc.getCollInfo()) { - _updateSession(opCtx); - sharding_ddl_util::stopMigrations( - opCtx, nss(), _doc.getCollInfo()->getUuid(), getCurrentSession()); + opCtx, nss(), _doc.getCollInfo()->getUuid(), getNewSession(opCtx)); } } @@ -266,7 +264,6 @@ void DropCollectionCoordinator::_enterCriticalSection( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); - _updateSession(opCtx); ShardsvrParticipantBlock blockCRUDOperationsRequest(nss()); blockCRUDOperationsRequest.setBlockType(mongo::CriticalSectionBlockTypeEnum::kReadsAndWrites); blockCRUDOperationsRequest.setReason(_critSecReason); @@ -277,7 +274,7 @@ void DropCollectionCoordinator::_enterCriticalSection( sharding_ddl_util::sendAuthenticatedCommandToShards( opCtx, nss().db(), - cmdObj.addFields(getCurrentSession().toBSON()), + cmdObj.addFields(getNewSession(opCtx).toBSON()), Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx), **executor); @@ -298,7 +295,6 @@ void DropCollectionCoordinator::_commitDropCollection( sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig( opCtx, BSON(analyze_shard_key::QueryAnalyzerDocument::kNsFieldName << nss().toString())); - _updateSession(opCtx); if (collIsSharded) { invariant(_doc.getCollInfo()); const auto& coll = _doc.getCollInfo().value(); @@ -312,18 +308,15 @@ void DropCollectionCoordinator::_commitDropCollection( Grid::get(opCtx)->catalogClient(), coll, ShardingCatalogClient::kMajorityWriteConcern, - getCurrentSession(), + getNewSession(opCtx), useClusterTransaction, **executor); } // Remove tags even if the collection is not sharded or didn't exist - _updateSession(opCtx); - sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss(), getCurrentSession()); - - // get a Lsid and an incremented txnNumber. Ensures we are the primary - _updateSession(opCtx); + sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss(), getNewSession(opCtx)); + // Ensures we are the primary const auto primaryShardId = ShardingState::get(opCtx)->shardId(); // We need to send the drop to all the shards because both movePrimary and @@ -334,13 +327,13 @@ void DropCollectionCoordinator::_commitDropCollection( participants.end()); sharding_ddl_util::sendDropCollectionParticipantCommandToShards( - opCtx, nss(), participants, **executor, getCurrentSession(), true /*fromMigrate*/); + opCtx, nss(), participants, **executor, getNewSession(opCtx), true /*fromMigrate*/); // The sharded collection must be dropped on the primary shard after it has been // dropped on all of the other shards to ensure it can only be re-created as // unsharded with a higher optime than all of the drops. sharding_ddl_util::sendDropCollectionParticipantCommandToShards( - opCtx, nss(), {primaryShardId}, **executor, getCurrentSession(), false /*fromMigrate*/); + opCtx, nss(), {primaryShardId}, **executor, getNewSession(opCtx), false /*fromMigrate*/); ShardingLogging::get(opCtx)->logChange(opCtx, "dropCollection", nss().ns()); LOGV2(5390503, "Collection dropped", logAttrs(nss())); @@ -354,7 +347,6 @@ void DropCollectionCoordinator::_exitCriticalSection( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); - _updateSession(opCtx); ShardsvrParticipantBlock unblockCRUDOperationsRequest(nss()); unblockCRUDOperationsRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock); unblockCRUDOperationsRequest.setReason(_critSecReason); @@ -365,7 +357,7 @@ void DropCollectionCoordinator::_exitCriticalSection( sharding_ddl_util::sendAuthenticatedCommandToShards( opCtx, nss().db(), - cmdObj.addFields(getCurrentSession().toBSON()), + cmdObj.addFields(getNewSession(opCtx).toBSON()), Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx), **executor); diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp index ef5ab472d4c..1d80d487a90 100644 --- a/src/mongo/db/s/drop_database_coordinator.cpp +++ b/src/mongo/db/s/drop_database_coordinator.cpp @@ -208,7 +208,6 @@ void DropDatabaseCoordinator::_dropShardedCollection( opCtx, nss.ns(), coorName, DDLLockManager::kDefaultLockTimeout); if (!_isPre70Compatible()) { - _updateSession(opCtx); ShardsvrParticipantBlock blockCRUDOperationsRequest(nss); blockCRUDOperationsRequest.setBlockType( mongo::CriticalSectionBlockTypeEnum::kReadsAndWrites); @@ -219,13 +218,11 @@ void DropDatabaseCoordinator::_dropShardedCollection( sharding_ddl_util::sendAuthenticatedCommandToShards( opCtx, nss.db(), - cmdObj.addFields(getCurrentSession().toBSON()), + cmdObj.addFields(getNewSession(opCtx).toBSON()), Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx), **executor); } - _updateSession(opCtx); - // This always runs in the shard role so should use a cluster transaction to guarantee // targeting the config server. bool useClusterTransaction = true; @@ -235,15 +232,13 @@ void DropDatabaseCoordinator::_dropShardedCollection( Grid::get(opCtx)->catalogClient(), coll, ShardingCatalogClient::kMajorityWriteConcern, - getCurrentSession(), + getNewSession(opCtx), useClusterTransaction, **executor); - _updateSession(opCtx); - sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getCurrentSession()); + sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getNewSession(opCtx)); const auto primaryShardId = ShardingState::get(opCtx)->shardId(); - _updateSession(opCtx); // We need to send the drop to all the shards because both movePrimary and // moveChunk leave garbage behind for sharded collections. @@ -252,16 +247,15 @@ void DropDatabaseCoordinator::_dropShardedCollection( participants.erase(std::remove(participants.begin(), participants.end(), primaryShardId), participants.end()); sharding_ddl_util::sendDropCollectionParticipantCommandToShards( - opCtx, nss, participants, **executor, getCurrentSession(), true /* fromMigrate */); + opCtx, nss, participants, **executor, getNewSession(opCtx), true /* fromMigrate */); // The sharded collection must be dropped on the primary shard after it has been dropped on all // of the other shards to ensure it can only be re-created as unsharded with a higher optime // than all of the drops. sharding_ddl_util::sendDropCollectionParticipantCommandToShards( - opCtx, nss, {primaryShardId}, **executor, getCurrentSession(), false /* fromMigrate */); + opCtx, nss, {primaryShardId}, **executor, getNewSession(opCtx), false /* fromMigrate */); if (!_isPre70Compatible()) { - _updateSession(opCtx); ShardsvrParticipantBlock unblockCRUDOperationsRequest(nss); unblockCRUDOperationsRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock); unblockCRUDOperationsRequest.setReason(getReasonForDropCollection(nss)); @@ -272,7 +266,7 @@ void DropDatabaseCoordinator::_dropShardedCollection( sharding_ddl_util::sendAuthenticatedCommandToShards( opCtx, nss.db(), - cmdObj.addFields(getCurrentSession().toBSON()), + cmdObj.addFields(getNewSession(opCtx).toBSON()), Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx), **executor); } @@ -319,9 +313,8 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( // Perform a noop write on the participants in order to advance the txnNumber // for this coordinator's lsid so that requests with older txnNumbers can no // longer execute. - _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getCurrentSession(), **executor); + opCtx, getNewSession(opCtx), **executor); } ShardingLogging::get(opCtx)->logChange(opCtx, "dropDatabase.start", _dbName); @@ -365,9 +358,8 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( const auto& nss = coll.getNss(); LOGV2_DEBUG(5494505, 2, "Dropping collection", logAttrs(nss)); - _updateSession(opCtx); sharding_ddl_util::stopMigrations( - opCtx, nss, coll.getUuid(), getCurrentSession()); + opCtx, nss, coll.getUuid(), getNewSession(opCtx)); auto newStateDoc = _doc; newStateDoc.setCollInfo(coll); @@ -384,9 +376,8 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( const auto& nssWithZones = catalogClient->getAllNssThatHaveZonesForDatabase(opCtx, _dbName); for (const auto& nss : nssWithZones) { - _updateSession(opCtx); sharding_ddl_util::removeTagsMetadataFromConfig( - opCtx, nss, getCurrentSession()); + opCtx, nss, getNewSession(opCtx)); } // Remove the query sampling configuration documents for all collections in this @@ -470,13 +461,12 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( _clearDatabaseInfoOnPrimary(opCtx); _clearDatabaseInfoOnSecondaries(opCtx); - _updateSession(opCtx); removeDatabaseFromConfigAndUpdatePlacementHistory( opCtx, **executor, _dbName, *metadata().getDatabaseVersion(), - getCurrentSession()); + getNewSession(opCtx)); VectorClockMutable::get(opCtx)->waitForDurableConfigTime().get(opCtx); } diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp index 8dd593b44e7..ec14848dfdd 100644 --- a/src/mongo/db/s/rename_collection_coordinator.cpp +++ b/src/mongo/db/s/rename_collection_coordinator.cpp @@ -654,15 +654,13 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( // Block migrations on involved sharded collections if (_doc.getOptShardedCollInfo()) { - _updateSession(opCtx); sharding_ddl_util::stopMigrations( - opCtx, fromNss, _doc.getSourceUUID(), getCurrentSession()); + opCtx, fromNss, _doc.getSourceUUID(), getNewSession(opCtx)); } if (_doc.getTargetIsSharded()) { - _updateSession(opCtx); sharding_ddl_util::stopMigrations( - opCtx, toNss, _doc.getTargetUUID(), getCurrentSession()); + opCtx, toNss, _doc.getTargetUUID(), getNewSession(opCtx)); } })) .then(_buildPhaseHandler( @@ -673,16 +671,12 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( getForwardableOpMetadata().setOn(opCtx); if (!_firstExecution) { - _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getCurrentSession(), **executor); + opCtx, getNewSession(opCtx), **executor); } const auto& fromNss = nss(); - _updateSession(opCtx); - const OperationSessionInfo osi = getCurrentSession(); - // On participant shards: // - Block CRUD on source and target collection in case at least one of such // collections is currently sharded @@ -695,7 +689,7 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( renameCollParticipantRequest.setRenameCollectionRequest(_request); const auto cmdObj = CommandHelpers::appendMajorityWriteConcern( renameCollParticipantRequest.toBSON({})) - .addFields(osi.toBSON()); + .addFields(getNewSession(opCtx).toBSON()); // We need to send the command to all the shards because both movePrimary and // moveChunk leave garbage behind for sharded collections. At the same time, the @@ -732,27 +726,23 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( // For an unsharded collection the CSRS server can not verify the targetUUID. // Use the session ID + txnNumber to ensure no stale requests get through. - _updateSession(opCtx); - if (!_firstExecution) { _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getCurrentSession(), **executor); + opCtx, getNewSession(opCtx), **executor); } if ((_doc.getTargetIsSharded() || _doc.getOptShardedCollInfo())) { renameIndexMetadataInShards( - opCtx, nss(), _request, getCurrentSession(), **executor, &_doc); + opCtx, nss(), _request, getNewSession(opCtx), **executor, &_doc); } - _updateSession(opCtx); - renameCollectionMetadataInTransaction(opCtx, _doc.getOptShardedCollInfo(), _request.getTo(), _doc.getTargetUUID(), ShardingCatalogClient::kMajorityWriteConcern, **executor, - getCurrentSession()); + getNewSession(opCtx)); })) .then(_buildPhaseHandler( Phase::kUnblockCRUD, @@ -762,9 +752,8 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( getForwardableOpMetadata().setOn(opCtx); if (!_firstExecution) { - _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getCurrentSession(), **executor); + opCtx, getNewSession(opCtx), **executor); } const auto& fromNss = nss(); @@ -778,11 +767,12 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( unblockParticipantRequest.toBSON({})); auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); - _updateSession(opCtx); - const OperationSessionInfo osi = getCurrentSession(); - sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, fromNss.db(), cmdObj.addFields(osi.toBSON()), participants, **executor); + opCtx, + fromNss.db(), + cmdObj.addFields(getNewSession(opCtx).toBSON()), + participants, + **executor); // Delete chunks belonging to the previous incarnation of the target collection. // This is performed after releasing the critical section in order to reduce stalls diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index db0827d1142..267b3161b04 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -372,6 +372,47 @@ protected: } } + /** + * Advances and persists the `txnNumber` to ensure causality between requests, then returns the + * updated operation session information (OSI). + */ + OperationSessionInfo getNewSession(OperationContext* opCtx) { + _updateSession(opCtx); + return getCurrentSession(); + } + + virtual boost::optional<Status> getAbortReason() const override { + const auto& status = _doc.getAbortReason(); + invariant(!status || !status->isOK(), "when persisted, status must be an error"); + return status; + } + + /** + * Persists the abort reason and throws it as an exception. This causes the coordinator to fail, + * and triggers the cleanup future chain since there is a the persisted reason. + */ + void triggerCleanup(OperationContext* opCtx, const Status& status) { + LOGV2_INFO(7418502, + "Coordinator failed, persisting abort reason", + "coordinatorId"_attr = _doc.getId(), + "phase"_attr = serializePhase(_doc.getPhase()), + "reason"_attr = redact(status)); + + auto newDoc = [&] { + stdx::lock_guard lk{_docMutex}; + return _doc; + }(); + + auto coordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata(); + coordinatorMetadata.setAbortReason(status); + newDoc.setShardingDDLCoordinatorMetadata(std::move(coordinatorMetadata)); + + _updateStateDocument(opCtx, std::move(newDoc)); + + uassertStatusOK(status); + } + +private: // lazily acquire Logical Session ID and a txn number void _updateSession(OperationContext* opCtx) { auto newDoc = [&] { @@ -408,42 +449,6 @@ protected: osi.setTxnNumber(optSession->getTxnNumber()); return osi; } - - OperationSessionInfo getNewSession(OperationContext* opCtx) { - _updateSession(opCtx); - return getCurrentSession(); - } - - virtual boost::optional<Status> getAbortReason() const override { - const auto& status = _doc.getAbortReason(); - invariant(!status || !status->isOK(), "when persisted, status must be an error"); - return status; - } - - /** - * Persists the abort reason and throws it as an exception. This causes the coordinator to fail, - * and triggers the cleanup future chain since there is a the persisted reason. - */ - void triggerCleanup(OperationContext* opCtx, const Status& status) { - LOGV2_INFO(7418502, - "Coordinator failed, persisting abort reason", - "coordinatorId"_attr = _doc.getId(), - "phase"_attr = serializePhase(_doc.getPhase()), - "reason"_attr = redact(status)); - - auto newDoc = [&] { - stdx::lock_guard lk{_docMutex}; - return _doc; - }(); - - auto coordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata(); - coordinatorMetadata.setAbortReason(status); - newDoc.setShardingDDLCoordinatorMetadata(std::move(coordinatorMetadata)); - - _updateStateDocument(opCtx, std::move(newDoc)); - - uassertStatusOK(status); - } }; #undef MONGO_LOGV2_DEFAULT_COMPONENT |