summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSilvia Surroca <silvia.surroca@mongodb.com>2023-05-12 14:48:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-12 16:16:19 +0000
commit5a8c52c0de24d8ddeb4a7d3b90ef5942c53cbd14 (patch)
treec136bac4e319737bd638077c7a2a2a09bbcd9096 /src/mongo/db
parent1c5afb84c8b935fc1246fb3a689a01f22ed0fafd (diff)
downloadmongo-5a8c52c0de24d8ddeb4a7d3b90ef5942c53cbd14.tar.gz
SERVER-76050 Enhance session handling in each coordinator
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/collmod_coordinator.cpp32
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp19
-rw-r--r--src/mongo/db/s/drop_collection_coordinator.cpp24
-rw-r--r--src/mongo/db/s/drop_database_coordinator.cpp30
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.cpp36
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.h77
6 files changed, 88 insertions, 130 deletions
diff --git a/src/mongo/db/s/collmod_coordinator.cpp b/src/mongo/db/s/collmod_coordinator.cpp
index 12b443a6bd2..be095f23f0a 100644
--- a/src/mongo/db/s/collmod_coordinator.cpp
+++ b/src/mongo/db/s/collmod_coordinator.cpp
@@ -134,9 +134,8 @@ void CollModCoordinator::_performNoopRetryableWriteOnParticipants(
return participants;
}();
- _updateSession(opCtx);
sharding_ddl_util::performNoopRetryableWriteOnShards(
- opCtx, shardsAndConfigsvr, getCurrentSession(), executor);
+ opCtx, shardsAndConfigsvr, getNewSession(opCtx), executor);
}
void CollModCoordinator::_saveCollectionInfoOnCoordinatorIfNecessary(OperationContext* opCtx) {
@@ -216,11 +215,10 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
if (_collInfo->isSharded) {
_doc.setCollUUID(
sharding_ddl_util::getCollectionUUID(opCtx, _collInfo->nsForTargeting));
- _updateSession(opCtx);
sharding_ddl_util::stopMigrations(opCtx,
_collInfo->nsForTargeting,
_doc.getCollUUID(),
- getCurrentSession());
+ getNewSession(opCtx));
}
})();
})
@@ -231,8 +229,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
- _updateSession(opCtx);
-
_saveCollectionInfoOnCoordinatorIfNecessary(opCtx);
if (_isPre61Compatible() && _collInfo->isSharded) {
@@ -243,11 +239,10 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
if (!migrationsAlreadyBlockedForBucketNss) {
_doc.setCollUUID(sharding_ddl_util::getCollectionUUID(
opCtx, _collInfo->nsForTargeting, true /* allowViews */));
- _updateSession(opCtx);
sharding_ddl_util::stopMigrations(opCtx,
_collInfo->nsForTargeting,
_doc.getCollUUID(),
- getCurrentSession());
+ getNewSession(opCtx));
}
}
@@ -260,7 +255,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
_updateStateDocument(opCtx, std::move(newDoc));
}
- _updateSession(opCtx);
ShardsvrParticipantBlock blockCRUDOperationsRequest(_collInfo->nsForTargeting);
blockCRUDOperationsRequest.setBlockType(
CriticalSectionBlockTypeEnum::kReadsAndWrites);
@@ -269,7 +263,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
blockCRUDOperationsRequest.toBSON({}),
_shardingInfo->shardsOwningChunks,
**executor,
- getCurrentSession());
+ getNewSession(opCtx));
}
}))
.then(_buildPhaseHandler(
@@ -281,8 +275,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
- _updateSession(opCtx);
-
_saveCollectionInfoOnCoordinatorIfNecessary(opCtx);
_saveShardingInfoOnCoordinatorIfNecessary(opCtx);
@@ -297,7 +289,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
configShard->runCommand(opCtx,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
nss().db().toString(),
- cmdObj,
+ cmdObj.addFields(getNewSession(opCtx).toBSON()),
Shard::RetryPolicy::kIdempotent)));
}
}))
@@ -307,8 +299,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
- _updateSession(opCtx);
-
_saveCollectionInfoOnCoordinatorIfNecessary(opCtx);
_saveShardingInfoOnCoordinatorIfNecessary(opCtx);
@@ -367,7 +357,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
// A view definition will only be present on the primary shard. So we pass
// an addition 'performViewChange' flag only to the primary shard.
if (primaryShardOwningChunk != shardsOwningChunks.end()) {
- _updateSession(opCtx);
request.setPerformViewChange(true);
const auto& primaryResponse = sendAuthenticatedCommandWithOsiToShards(
opCtx,
@@ -375,13 +364,12 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
request.toBSON({}),
{_shardingInfo->primaryShard},
**executor,
- getCurrentSession());
+ getNewSession(opCtx));
responses.insert(
responses.end(), primaryResponse.begin(), primaryResponse.end());
shardsOwningChunks.erase(primaryShardOwningChunk);
}
- _updateSession(opCtx);
request.setPerformViewChange(false);
const auto& secondaryResponses =
sendAuthenticatedCommandWithOsiToShards(opCtx,
@@ -389,7 +377,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
request.toBSON({}),
shardsOwningChunks,
**executor,
- getCurrentSession());
+ getNewSession(opCtx));
responses.insert(
responses.end(), secondaryResponses.begin(), secondaryResponses.end());
@@ -401,18 +389,16 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
CommandHelpers::appendSimpleCommandStatus(builder, ok, errmsg);
}
_result = builder.obj();
- _updateSession(opCtx);
sharding_ddl_util::resumeMigrations(opCtx,
_collInfo->nsForTargeting,
_doc.getCollUUID(),
- getCurrentSession());
+ getNewSession(opCtx));
} catch (DBException& ex) {
if (!_isRetriableErrorForDDLCoordinator(ex.toStatus())) {
- _updateSession(opCtx);
sharding_ddl_util::resumeMigrations(opCtx,
_collInfo->nsForTargeting,
_doc.getCollUUID(),
- getCurrentSession());
+ getNewSession(opCtx));
}
throw;
}
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index 78af6bd3fc4..987e7ee1df2 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -474,9 +474,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// Additionally we want to perform a majority write on the CSRS to ensure that
// all the subsequent reads will see all the writes performed from a previous
// execution of this coordinator.
- _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getCurrentSession(), **executor);
+ opCtx, getNewSession(opCtx), **executor);
if (_timeseriesNssResolvedByCommandHandler() ||
_doc.getTranslatedRequestParams()) {
@@ -534,12 +533,10 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
"Removing partial changes from previous run",
logAttrs(nss()));
- _updateSession(opCtx);
cleanupPartialChunksFromPreviousAttempt(
- opCtx, *uuid, getCurrentSession());
+ opCtx, *uuid, getNewSession(opCtx));
- _updateSession(opCtx);
- broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession());
+ broadcastDropCollection(opCtx, nss(), **executor, getNewSession(opCtx));
}
}
}
@@ -596,8 +593,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// shard
_promoteCriticalSectionsToBlockReads(opCtx);
- _updateSession(opCtx);
- _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession());
+ _createCollectionOnNonPrimaryShards(opCtx, getNewSession(opCtx));
_commit(opCtx, **executor);
}
@@ -1200,8 +1196,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx,
}
// Upsert Chunks.
- _updateSession(opCtx);
- insertChunks(opCtx, _initialChunks->chunks, getCurrentSession());
+ insertChunks(opCtx, _initialChunks->chunks, getNewSession(opCtx));
// The coll and shardsHoldingData objects will be used by both this function and
// insertCollectionAndPlacementEntries(), which accesses their content from a separate thread
@@ -1240,7 +1235,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx,
coll->setUnique(*_request.getUnique());
}
- _updateSession(opCtx);
+ const auto& osi = getNewSession(opCtx);
try {
notifyChangeStreamsOnShardCollection(opCtx,
nss(),
@@ -1250,7 +1245,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx,
*shardsHoldingData);
insertCollectionAndPlacementEntries(
- opCtx, executor, coll, placementVersion, shardsHoldingData, getCurrentSession());
+ opCtx, executor, coll, placementVersion, shardsHoldingData, osi);
notifyChangeStreamsOnShardCollection(
opCtx, nss(), *_collectionUUID, _request.toBSON(), CommitPhase::kSuccessful);
diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp
index 333657bc8ee..2452701358b 100644
--- a/src/mongo/db/s/drop_collection_coordinator.cpp
+++ b/src/mongo/db/s/drop_collection_coordinator.cpp
@@ -251,10 +251,8 @@ void DropCollectionCoordinator::_freezeMigrations(
opCtx, "dropCollection.start", nss().ns(), logChangeDetail.obj());
if (_doc.getCollInfo()) {
- _updateSession(opCtx);
-
sharding_ddl_util::stopMigrations(
- opCtx, nss(), _doc.getCollInfo()->getUuid(), getCurrentSession());
+ opCtx, nss(), _doc.getCollInfo()->getUuid(), getNewSession(opCtx));
}
}
@@ -266,7 +264,6 @@ void DropCollectionCoordinator::_enterCriticalSection(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
- _updateSession(opCtx);
ShardsvrParticipantBlock blockCRUDOperationsRequest(nss());
blockCRUDOperationsRequest.setBlockType(mongo::CriticalSectionBlockTypeEnum::kReadsAndWrites);
blockCRUDOperationsRequest.setReason(_critSecReason);
@@ -277,7 +274,7 @@ void DropCollectionCoordinator::_enterCriticalSection(
sharding_ddl_util::sendAuthenticatedCommandToShards(
opCtx,
nss().db(),
- cmdObj.addFields(getCurrentSession().toBSON()),
+ cmdObj.addFields(getNewSession(opCtx).toBSON()),
Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx),
**executor);
@@ -298,7 +295,6 @@ void DropCollectionCoordinator::_commitDropCollection(
sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(
opCtx, BSON(analyze_shard_key::QueryAnalyzerDocument::kNsFieldName << nss().toString()));
- _updateSession(opCtx);
if (collIsSharded) {
invariant(_doc.getCollInfo());
const auto& coll = _doc.getCollInfo().value();
@@ -312,18 +308,15 @@ void DropCollectionCoordinator::_commitDropCollection(
Grid::get(opCtx)->catalogClient(),
coll,
ShardingCatalogClient::kMajorityWriteConcern,
- getCurrentSession(),
+ getNewSession(opCtx),
useClusterTransaction,
**executor);
}
// Remove tags even if the collection is not sharded or didn't exist
- _updateSession(opCtx);
- sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss(), getCurrentSession());
-
- // get a Lsid and an incremented txnNumber. Ensures we are the primary
- _updateSession(opCtx);
+ sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss(), getNewSession(opCtx));
+ // Ensures we are the primary
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
// We need to send the drop to all the shards because both movePrimary and
@@ -334,13 +327,13 @@ void DropCollectionCoordinator::_commitDropCollection(
participants.end());
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
- opCtx, nss(), participants, **executor, getCurrentSession(), true /*fromMigrate*/);
+ opCtx, nss(), participants, **executor, getNewSession(opCtx), true /*fromMigrate*/);
// The sharded collection must be dropped on the primary shard after it has been
// dropped on all of the other shards to ensure it can only be re-created as
// unsharded with a higher optime than all of the drops.
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
- opCtx, nss(), {primaryShardId}, **executor, getCurrentSession(), false /*fromMigrate*/);
+ opCtx, nss(), {primaryShardId}, **executor, getNewSession(opCtx), false /*fromMigrate*/);
ShardingLogging::get(opCtx)->logChange(opCtx, "dropCollection", nss().ns());
LOGV2(5390503, "Collection dropped", logAttrs(nss()));
@@ -354,7 +347,6 @@ void DropCollectionCoordinator::_exitCriticalSection(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
- _updateSession(opCtx);
ShardsvrParticipantBlock unblockCRUDOperationsRequest(nss());
unblockCRUDOperationsRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock);
unblockCRUDOperationsRequest.setReason(_critSecReason);
@@ -365,7 +357,7 @@ void DropCollectionCoordinator::_exitCriticalSection(
sharding_ddl_util::sendAuthenticatedCommandToShards(
opCtx,
nss().db(),
- cmdObj.addFields(getCurrentSession().toBSON()),
+ cmdObj.addFields(getNewSession(opCtx).toBSON()),
Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx),
**executor);
diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp
index ef5ab472d4c..1d80d487a90 100644
--- a/src/mongo/db/s/drop_database_coordinator.cpp
+++ b/src/mongo/db/s/drop_database_coordinator.cpp
@@ -208,7 +208,6 @@ void DropDatabaseCoordinator::_dropShardedCollection(
opCtx, nss.ns(), coorName, DDLLockManager::kDefaultLockTimeout);
if (!_isPre70Compatible()) {
- _updateSession(opCtx);
ShardsvrParticipantBlock blockCRUDOperationsRequest(nss);
blockCRUDOperationsRequest.setBlockType(
mongo::CriticalSectionBlockTypeEnum::kReadsAndWrites);
@@ -219,13 +218,11 @@ void DropDatabaseCoordinator::_dropShardedCollection(
sharding_ddl_util::sendAuthenticatedCommandToShards(
opCtx,
nss.db(),
- cmdObj.addFields(getCurrentSession().toBSON()),
+ cmdObj.addFields(getNewSession(opCtx).toBSON()),
Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx),
**executor);
}
- _updateSession(opCtx);
-
// This always runs in the shard role so should use a cluster transaction to guarantee
// targeting the config server.
bool useClusterTransaction = true;
@@ -235,15 +232,13 @@ void DropDatabaseCoordinator::_dropShardedCollection(
Grid::get(opCtx)->catalogClient(),
coll,
ShardingCatalogClient::kMajorityWriteConcern,
- getCurrentSession(),
+ getNewSession(opCtx),
useClusterTransaction,
**executor);
- _updateSession(opCtx);
- sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getCurrentSession());
+ sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getNewSession(opCtx));
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
- _updateSession(opCtx);
// We need to send the drop to all the shards because both movePrimary and
// moveChunk leave garbage behind for sharded collections.
@@ -252,16 +247,15 @@ void DropDatabaseCoordinator::_dropShardedCollection(
participants.erase(std::remove(participants.begin(), participants.end(), primaryShardId),
participants.end());
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
- opCtx, nss, participants, **executor, getCurrentSession(), true /* fromMigrate */);
+ opCtx, nss, participants, **executor, getNewSession(opCtx), true /* fromMigrate */);
// The sharded collection must be dropped on the primary shard after it has been dropped on all
// of the other shards to ensure it can only be re-created as unsharded with a higher optime
// than all of the drops.
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
- opCtx, nss, {primaryShardId}, **executor, getCurrentSession(), false /* fromMigrate */);
+ opCtx, nss, {primaryShardId}, **executor, getNewSession(opCtx), false /* fromMigrate */);
if (!_isPre70Compatible()) {
- _updateSession(opCtx);
ShardsvrParticipantBlock unblockCRUDOperationsRequest(nss);
unblockCRUDOperationsRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock);
unblockCRUDOperationsRequest.setReason(getReasonForDropCollection(nss));
@@ -272,7 +266,7 @@ void DropDatabaseCoordinator::_dropShardedCollection(
sharding_ddl_util::sendAuthenticatedCommandToShards(
opCtx,
nss.db(),
- cmdObj.addFields(getCurrentSession().toBSON()),
+ cmdObj.addFields(getNewSession(opCtx).toBSON()),
Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx),
**executor);
}
@@ -319,9 +313,8 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
// Perform a noop write on the participants in order to advance the txnNumber
// for this coordinator's lsid so that requests with older txnNumbers can no
// longer execute.
- _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getCurrentSession(), **executor);
+ opCtx, getNewSession(opCtx), **executor);
}
ShardingLogging::get(opCtx)->logChange(opCtx, "dropDatabase.start", _dbName);
@@ -365,9 +358,8 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
const auto& nss = coll.getNss();
LOGV2_DEBUG(5494505, 2, "Dropping collection", logAttrs(nss));
- _updateSession(opCtx);
sharding_ddl_util::stopMigrations(
- opCtx, nss, coll.getUuid(), getCurrentSession());
+ opCtx, nss, coll.getUuid(), getNewSession(opCtx));
auto newStateDoc = _doc;
newStateDoc.setCollInfo(coll);
@@ -384,9 +376,8 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
const auto& nssWithZones =
catalogClient->getAllNssThatHaveZonesForDatabase(opCtx, _dbName);
for (const auto& nss : nssWithZones) {
- _updateSession(opCtx);
sharding_ddl_util::removeTagsMetadataFromConfig(
- opCtx, nss, getCurrentSession());
+ opCtx, nss, getNewSession(opCtx));
}
// Remove the query sampling configuration documents for all collections in this
@@ -470,13 +461,12 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
_clearDatabaseInfoOnPrimary(opCtx);
_clearDatabaseInfoOnSecondaries(opCtx);
- _updateSession(opCtx);
removeDatabaseFromConfigAndUpdatePlacementHistory(
opCtx,
**executor,
_dbName,
*metadata().getDatabaseVersion(),
- getCurrentSession());
+ getNewSession(opCtx));
VectorClockMutable::get(opCtx)->waitForDurableConfigTime().get(opCtx);
}
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp
index 8dd593b44e7..ec14848dfdd 100644
--- a/src/mongo/db/s/rename_collection_coordinator.cpp
+++ b/src/mongo/db/s/rename_collection_coordinator.cpp
@@ -654,15 +654,13 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
// Block migrations on involved sharded collections
if (_doc.getOptShardedCollInfo()) {
- _updateSession(opCtx);
sharding_ddl_util::stopMigrations(
- opCtx, fromNss, _doc.getSourceUUID(), getCurrentSession());
+ opCtx, fromNss, _doc.getSourceUUID(), getNewSession(opCtx));
}
if (_doc.getTargetIsSharded()) {
- _updateSession(opCtx);
sharding_ddl_util::stopMigrations(
- opCtx, toNss, _doc.getTargetUUID(), getCurrentSession());
+ opCtx, toNss, _doc.getTargetUUID(), getNewSession(opCtx));
}
}))
.then(_buildPhaseHandler(
@@ -673,16 +671,12 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
getForwardableOpMetadata().setOn(opCtx);
if (!_firstExecution) {
- _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getCurrentSession(), **executor);
+ opCtx, getNewSession(opCtx), **executor);
}
const auto& fromNss = nss();
- _updateSession(opCtx);
- const OperationSessionInfo osi = getCurrentSession();
-
// On participant shards:
// - Block CRUD on source and target collection in case at least one of such
// collections is currently sharded
@@ -695,7 +689,7 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
renameCollParticipantRequest.setRenameCollectionRequest(_request);
const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(
renameCollParticipantRequest.toBSON({}))
- .addFields(osi.toBSON());
+ .addFields(getNewSession(opCtx).toBSON());
// We need to send the command to all the shards because both movePrimary and
// moveChunk leave garbage behind for sharded collections. At the same time, the
@@ -732,27 +726,23 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
// For an unsharded collection the CSRS server can not verify the targetUUID.
// Use the session ID + txnNumber to ensure no stale requests get through.
- _updateSession(opCtx);
-
if (!_firstExecution) {
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getCurrentSession(), **executor);
+ opCtx, getNewSession(opCtx), **executor);
}
if ((_doc.getTargetIsSharded() || _doc.getOptShardedCollInfo())) {
renameIndexMetadataInShards(
- opCtx, nss(), _request, getCurrentSession(), **executor, &_doc);
+ opCtx, nss(), _request, getNewSession(opCtx), **executor, &_doc);
}
- _updateSession(opCtx);
-
renameCollectionMetadataInTransaction(opCtx,
_doc.getOptShardedCollInfo(),
_request.getTo(),
_doc.getTargetUUID(),
ShardingCatalogClient::kMajorityWriteConcern,
**executor,
- getCurrentSession());
+ getNewSession(opCtx));
}))
.then(_buildPhaseHandler(
Phase::kUnblockCRUD,
@@ -762,9 +752,8 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
getForwardableOpMetadata().setOn(opCtx);
if (!_firstExecution) {
- _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getCurrentSession(), **executor);
+ opCtx, getNewSession(opCtx), **executor);
}
const auto& fromNss = nss();
@@ -778,11 +767,12 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
unblockParticipantRequest.toBSON({}));
auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
- _updateSession(opCtx);
- const OperationSessionInfo osi = getCurrentSession();
-
sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx, fromNss.db(), cmdObj.addFields(osi.toBSON()), participants, **executor);
+ opCtx,
+ fromNss.db(),
+ cmdObj.addFields(getNewSession(opCtx).toBSON()),
+ participants,
+ **executor);
// Delete chunks belonging to the previous incarnation of the target collection.
// This is performed after releasing the critical section in order to reduce stalls
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h
index db0827d1142..267b3161b04 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator.h
@@ -372,6 +372,47 @@ protected:
}
}
+ /**
+ * Advances and persists the `txnNumber` to ensure causality between requests, then returns the
+ * updated operation session information (OSI).
+ */
+ OperationSessionInfo getNewSession(OperationContext* opCtx) {
+ _updateSession(opCtx);
+ return getCurrentSession();
+ }
+
+ virtual boost::optional<Status> getAbortReason() const override {
+ const auto& status = _doc.getAbortReason();
+ invariant(!status || !status->isOK(), "when persisted, status must be an error");
+ return status;
+ }
+
+ /**
+ * Persists the abort reason and throws it as an exception. This causes the coordinator to fail,
+ * and triggers the cleanup future chain since there is a the persisted reason.
+ */
+ void triggerCleanup(OperationContext* opCtx, const Status& status) {
+ LOGV2_INFO(7418502,
+ "Coordinator failed, persisting abort reason",
+ "coordinatorId"_attr = _doc.getId(),
+ "phase"_attr = serializePhase(_doc.getPhase()),
+ "reason"_attr = redact(status));
+
+ auto newDoc = [&] {
+ stdx::lock_guard lk{_docMutex};
+ return _doc;
+ }();
+
+ auto coordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata();
+ coordinatorMetadata.setAbortReason(status);
+ newDoc.setShardingDDLCoordinatorMetadata(std::move(coordinatorMetadata));
+
+ _updateStateDocument(opCtx, std::move(newDoc));
+
+ uassertStatusOK(status);
+ }
+
+private:
// lazily acquire Logical Session ID and a txn number
void _updateSession(OperationContext* opCtx) {
auto newDoc = [&] {
@@ -408,42 +449,6 @@ protected:
osi.setTxnNumber(optSession->getTxnNumber());
return osi;
}
-
- OperationSessionInfo getNewSession(OperationContext* opCtx) {
- _updateSession(opCtx);
- return getCurrentSession();
- }
-
- virtual boost::optional<Status> getAbortReason() const override {
- const auto& status = _doc.getAbortReason();
- invariant(!status || !status->isOK(), "when persisted, status must be an error");
- return status;
- }
-
- /**
- * Persists the abort reason and throws it as an exception. This causes the coordinator to fail,
- * and triggers the cleanup future chain since there is a the persisted reason.
- */
- void triggerCleanup(OperationContext* opCtx, const Status& status) {
- LOGV2_INFO(7418502,
- "Coordinator failed, persisting abort reason",
- "coordinatorId"_attr = _doc.getId(),
- "phase"_attr = serializePhase(_doc.getPhase()),
- "reason"_attr = redact(status));
-
- auto newDoc = [&] {
- stdx::lock_guard lk{_docMutex};
- return _doc;
- }();
-
- auto coordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata();
- coordinatorMetadata.setAbortReason(status);
- newDoc.setShardingDDLCoordinatorMetadata(std::move(coordinatorMetadata));
-
- _updateStateDocument(opCtx, std::move(newDoc));
-
- uassertStatusOK(status);
- }
};
#undef MONGO_LOGV2_DEFAULT_COMPONENT