summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorauto-revert-processor <dev-prod-dag@mongodb.com>2023-05-12 23:41:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-13 01:34:46 +0000
commit68145fa6580490c6d6d1e3829359f3260be8f9bc (patch)
treed3a6f26d3772b61b7adc28c59ee044061cedb7e1
parentcc09debe1f3eacece6986f8bb5fd3199600e54c6 (diff)
downloadmongo-68145fa6580490c6d6d1e3829359f3260be8f9bc.tar.gz
Revert "SERVER-76050 Enhance session handling in each coordinator"
This reverts commit 5a8c52c0de24d8ddeb4a7d3b90ef5942c53cbd14.
-rw-r--r--src/mongo/db/s/collmod_coordinator.cpp32
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp19
-rw-r--r--src/mongo/db/s/drop_collection_coordinator.cpp24
-rw-r--r--src/mongo/db/s/drop_database_coordinator.cpp30
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.cpp36
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.h77
6 files changed, 130 insertions, 88 deletions
diff --git a/src/mongo/db/s/collmod_coordinator.cpp b/src/mongo/db/s/collmod_coordinator.cpp
index be095f23f0a..12b443a6bd2 100644
--- a/src/mongo/db/s/collmod_coordinator.cpp
+++ b/src/mongo/db/s/collmod_coordinator.cpp
@@ -134,8 +134,9 @@ void CollModCoordinator::_performNoopRetryableWriteOnParticipants(
return participants;
}();
+ _updateSession(opCtx);
sharding_ddl_util::performNoopRetryableWriteOnShards(
- opCtx, shardsAndConfigsvr, getNewSession(opCtx), executor);
+ opCtx, shardsAndConfigsvr, getCurrentSession(), executor);
}
void CollModCoordinator::_saveCollectionInfoOnCoordinatorIfNecessary(OperationContext* opCtx) {
@@ -215,10 +216,11 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
if (_collInfo->isSharded) {
_doc.setCollUUID(
sharding_ddl_util::getCollectionUUID(opCtx, _collInfo->nsForTargeting));
+ _updateSession(opCtx);
sharding_ddl_util::stopMigrations(opCtx,
_collInfo->nsForTargeting,
_doc.getCollUUID(),
- getNewSession(opCtx));
+ getCurrentSession());
}
})();
})
@@ -229,6 +231,8 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+ _updateSession(opCtx);
+
_saveCollectionInfoOnCoordinatorIfNecessary(opCtx);
if (_isPre61Compatible() && _collInfo->isSharded) {
@@ -239,10 +243,11 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
if (!migrationsAlreadyBlockedForBucketNss) {
_doc.setCollUUID(sharding_ddl_util::getCollectionUUID(
opCtx, _collInfo->nsForTargeting, true /* allowViews */));
+ _updateSession(opCtx);
sharding_ddl_util::stopMigrations(opCtx,
_collInfo->nsForTargeting,
_doc.getCollUUID(),
- getNewSession(opCtx));
+ getCurrentSession());
}
}
@@ -255,6 +260,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
_updateStateDocument(opCtx, std::move(newDoc));
}
+ _updateSession(opCtx);
ShardsvrParticipantBlock blockCRUDOperationsRequest(_collInfo->nsForTargeting);
blockCRUDOperationsRequest.setBlockType(
CriticalSectionBlockTypeEnum::kReadsAndWrites);
@@ -263,7 +269,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
blockCRUDOperationsRequest.toBSON({}),
_shardingInfo->shardsOwningChunks,
**executor,
- getNewSession(opCtx));
+ getCurrentSession());
}
}))
.then(_buildPhaseHandler(
@@ -275,6 +281,8 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+ _updateSession(opCtx);
+
_saveCollectionInfoOnCoordinatorIfNecessary(opCtx);
_saveShardingInfoOnCoordinatorIfNecessary(opCtx);
@@ -289,7 +297,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
configShard->runCommand(opCtx,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
nss().db().toString(),
- cmdObj.addFields(getNewSession(opCtx).toBSON()),
+ cmdObj,
Shard::RetryPolicy::kIdempotent)));
}
}))
@@ -299,6 +307,8 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+ _updateSession(opCtx);
+
_saveCollectionInfoOnCoordinatorIfNecessary(opCtx);
_saveShardingInfoOnCoordinatorIfNecessary(opCtx);
@@ -357,6 +367,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
// A view definition will only be present on the primary shard. So we pass
// an addition 'performViewChange' flag only to the primary shard.
if (primaryShardOwningChunk != shardsOwningChunks.end()) {
+ _updateSession(opCtx);
request.setPerformViewChange(true);
const auto& primaryResponse = sendAuthenticatedCommandWithOsiToShards(
opCtx,
@@ -364,12 +375,13 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
request.toBSON({}),
{_shardingInfo->primaryShard},
**executor,
- getNewSession(opCtx));
+ getCurrentSession());
responses.insert(
responses.end(), primaryResponse.begin(), primaryResponse.end());
shardsOwningChunks.erase(primaryShardOwningChunk);
}
+ _updateSession(opCtx);
request.setPerformViewChange(false);
const auto& secondaryResponses =
sendAuthenticatedCommandWithOsiToShards(opCtx,
@@ -377,7 +389,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
request.toBSON({}),
shardsOwningChunks,
**executor,
- getNewSession(opCtx));
+ getCurrentSession());
responses.insert(
responses.end(), secondaryResponses.begin(), secondaryResponses.end());
@@ -389,16 +401,18 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
CommandHelpers::appendSimpleCommandStatus(builder, ok, errmsg);
}
_result = builder.obj();
+ _updateSession(opCtx);
sharding_ddl_util::resumeMigrations(opCtx,
_collInfo->nsForTargeting,
_doc.getCollUUID(),
- getNewSession(opCtx));
+ getCurrentSession());
} catch (DBException& ex) {
if (!_isRetriableErrorForDDLCoordinator(ex.toStatus())) {
+ _updateSession(opCtx);
sharding_ddl_util::resumeMigrations(opCtx,
_collInfo->nsForTargeting,
_doc.getCollUUID(),
- getNewSession(opCtx));
+ getCurrentSession());
}
throw;
}
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index 987e7ee1df2..78af6bd3fc4 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -474,8 +474,9 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// Additionally we want to perform a majority write on the CSRS to ensure that
// all the subsequent reads will see all the writes performed from a previous
// execution of this coordinator.
+ _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getNewSession(opCtx), **executor);
+ opCtx, getCurrentSession(), **executor);
if (_timeseriesNssResolvedByCommandHandler() ||
_doc.getTranslatedRequestParams()) {
@@ -533,10 +534,12 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
"Removing partial changes from previous run",
logAttrs(nss()));
+ _updateSession(opCtx);
cleanupPartialChunksFromPreviousAttempt(
- opCtx, *uuid, getNewSession(opCtx));
+ opCtx, *uuid, getCurrentSession());
- broadcastDropCollection(opCtx, nss(), **executor, getNewSession(opCtx));
+ _updateSession(opCtx);
+ broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession());
}
}
}
@@ -593,7 +596,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// shard
_promoteCriticalSectionsToBlockReads(opCtx);
- _createCollectionOnNonPrimaryShards(opCtx, getNewSession(opCtx));
+ _updateSession(opCtx);
+ _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession());
_commit(opCtx, **executor);
}
@@ -1196,7 +1200,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx,
}
// Upsert Chunks.
- insertChunks(opCtx, _initialChunks->chunks, getNewSession(opCtx));
+ _updateSession(opCtx);
+ insertChunks(opCtx, _initialChunks->chunks, getCurrentSession());
// The coll and shardsHoldingData objects will be used by both this function and
// insertCollectionAndPlacementEntries(), which accesses their content from a separate thread
@@ -1235,7 +1240,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx,
coll->setUnique(*_request.getUnique());
}
- const auto& osi = getNewSession(opCtx);
+ _updateSession(opCtx);
try {
notifyChangeStreamsOnShardCollection(opCtx,
nss(),
@@ -1245,7 +1250,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx,
*shardsHoldingData);
insertCollectionAndPlacementEntries(
- opCtx, executor, coll, placementVersion, shardsHoldingData, osi);
+ opCtx, executor, coll, placementVersion, shardsHoldingData, getCurrentSession());
notifyChangeStreamsOnShardCollection(
opCtx, nss(), *_collectionUUID, _request.toBSON(), CommitPhase::kSuccessful);
diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp
index 2452701358b..333657bc8ee 100644
--- a/src/mongo/db/s/drop_collection_coordinator.cpp
+++ b/src/mongo/db/s/drop_collection_coordinator.cpp
@@ -251,8 +251,10 @@ void DropCollectionCoordinator::_freezeMigrations(
opCtx, "dropCollection.start", nss().ns(), logChangeDetail.obj());
if (_doc.getCollInfo()) {
+ _updateSession(opCtx);
+
sharding_ddl_util::stopMigrations(
- opCtx, nss(), _doc.getCollInfo()->getUuid(), getNewSession(opCtx));
+ opCtx, nss(), _doc.getCollInfo()->getUuid(), getCurrentSession());
}
}
@@ -264,6 +266,7 @@ void DropCollectionCoordinator::_enterCriticalSection(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+ _updateSession(opCtx);
ShardsvrParticipantBlock blockCRUDOperationsRequest(nss());
blockCRUDOperationsRequest.setBlockType(mongo::CriticalSectionBlockTypeEnum::kReadsAndWrites);
blockCRUDOperationsRequest.setReason(_critSecReason);
@@ -274,7 +277,7 @@ void DropCollectionCoordinator::_enterCriticalSection(
sharding_ddl_util::sendAuthenticatedCommandToShards(
opCtx,
nss().db(),
- cmdObj.addFields(getNewSession(opCtx).toBSON()),
+ cmdObj.addFields(getCurrentSession().toBSON()),
Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx),
**executor);
@@ -295,6 +298,7 @@ void DropCollectionCoordinator::_commitDropCollection(
sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(
opCtx, BSON(analyze_shard_key::QueryAnalyzerDocument::kNsFieldName << nss().toString()));
+ _updateSession(opCtx);
if (collIsSharded) {
invariant(_doc.getCollInfo());
const auto& coll = _doc.getCollInfo().value();
@@ -308,15 +312,18 @@ void DropCollectionCoordinator::_commitDropCollection(
Grid::get(opCtx)->catalogClient(),
coll,
ShardingCatalogClient::kMajorityWriteConcern,
- getNewSession(opCtx),
+ getCurrentSession(),
useClusterTransaction,
**executor);
}
// Remove tags even if the collection is not sharded or didn't exist
- sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss(), getNewSession(opCtx));
+ _updateSession(opCtx);
+ sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss(), getCurrentSession());
+
+ // get a Lsid and an incremented txnNumber. Ensures we are the primary
+ _updateSession(opCtx);
- // Ensures we are the primary
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
// We need to send the drop to all the shards because both movePrimary and
@@ -327,13 +334,13 @@ void DropCollectionCoordinator::_commitDropCollection(
participants.end());
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
- opCtx, nss(), participants, **executor, getNewSession(opCtx), true /*fromMigrate*/);
+ opCtx, nss(), participants, **executor, getCurrentSession(), true /*fromMigrate*/);
// The sharded collection must be dropped on the primary shard after it has been
// dropped on all of the other shards to ensure it can only be re-created as
// unsharded with a higher optime than all of the drops.
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
- opCtx, nss(), {primaryShardId}, **executor, getNewSession(opCtx), false /*fromMigrate*/);
+ opCtx, nss(), {primaryShardId}, **executor, getCurrentSession(), false /*fromMigrate*/);
ShardingLogging::get(opCtx)->logChange(opCtx, "dropCollection", nss().ns());
LOGV2(5390503, "Collection dropped", logAttrs(nss()));
@@ -347,6 +354,7 @@ void DropCollectionCoordinator::_exitCriticalSection(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+ _updateSession(opCtx);
ShardsvrParticipantBlock unblockCRUDOperationsRequest(nss());
unblockCRUDOperationsRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock);
unblockCRUDOperationsRequest.setReason(_critSecReason);
@@ -357,7 +365,7 @@ void DropCollectionCoordinator::_exitCriticalSection(
sharding_ddl_util::sendAuthenticatedCommandToShards(
opCtx,
nss().db(),
- cmdObj.addFields(getNewSession(opCtx).toBSON()),
+ cmdObj.addFields(getCurrentSession().toBSON()),
Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx),
**executor);
diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp
index 1d80d487a90..ef5ab472d4c 100644
--- a/src/mongo/db/s/drop_database_coordinator.cpp
+++ b/src/mongo/db/s/drop_database_coordinator.cpp
@@ -208,6 +208,7 @@ void DropDatabaseCoordinator::_dropShardedCollection(
opCtx, nss.ns(), coorName, DDLLockManager::kDefaultLockTimeout);
if (!_isPre70Compatible()) {
+ _updateSession(opCtx);
ShardsvrParticipantBlock blockCRUDOperationsRequest(nss);
blockCRUDOperationsRequest.setBlockType(
mongo::CriticalSectionBlockTypeEnum::kReadsAndWrites);
@@ -218,11 +219,13 @@ void DropDatabaseCoordinator::_dropShardedCollection(
sharding_ddl_util::sendAuthenticatedCommandToShards(
opCtx,
nss.db(),
- cmdObj.addFields(getNewSession(opCtx).toBSON()),
+ cmdObj.addFields(getCurrentSession().toBSON()),
Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx),
**executor);
}
+ _updateSession(opCtx);
+
// This always runs in the shard role so should use a cluster transaction to guarantee
// targeting the config server.
bool useClusterTransaction = true;
@@ -232,13 +235,15 @@ void DropDatabaseCoordinator::_dropShardedCollection(
Grid::get(opCtx)->catalogClient(),
coll,
ShardingCatalogClient::kMajorityWriteConcern,
- getNewSession(opCtx),
+ getCurrentSession(),
useClusterTransaction,
**executor);
- sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getNewSession(opCtx));
+ _updateSession(opCtx);
+ sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getCurrentSession());
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
+ _updateSession(opCtx);
// We need to send the drop to all the shards because both movePrimary and
// moveChunk leave garbage behind for sharded collections.
@@ -247,15 +252,16 @@ void DropDatabaseCoordinator::_dropShardedCollection(
participants.erase(std::remove(participants.begin(), participants.end(), primaryShardId),
participants.end());
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
- opCtx, nss, participants, **executor, getNewSession(opCtx), true /* fromMigrate */);
+ opCtx, nss, participants, **executor, getCurrentSession(), true /* fromMigrate */);
// The sharded collection must be dropped on the primary shard after it has been dropped on all
// of the other shards to ensure it can only be re-created as unsharded with a higher optime
// than all of the drops.
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
- opCtx, nss, {primaryShardId}, **executor, getNewSession(opCtx), false /* fromMigrate */);
+ opCtx, nss, {primaryShardId}, **executor, getCurrentSession(), false /* fromMigrate */);
if (!_isPre70Compatible()) {
+ _updateSession(opCtx);
ShardsvrParticipantBlock unblockCRUDOperationsRequest(nss);
unblockCRUDOperationsRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock);
unblockCRUDOperationsRequest.setReason(getReasonForDropCollection(nss));
@@ -266,7 +272,7 @@ void DropDatabaseCoordinator::_dropShardedCollection(
sharding_ddl_util::sendAuthenticatedCommandToShards(
opCtx,
nss.db(),
- cmdObj.addFields(getNewSession(opCtx).toBSON()),
+ cmdObj.addFields(getCurrentSession().toBSON()),
Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx),
**executor);
}
@@ -313,8 +319,9 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
// Perform a noop write on the participants in order to advance the txnNumber
// for this coordinator's lsid so that requests with older txnNumbers can no
// longer execute.
+ _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getNewSession(opCtx), **executor);
+ opCtx, getCurrentSession(), **executor);
}
ShardingLogging::get(opCtx)->logChange(opCtx, "dropDatabase.start", _dbName);
@@ -358,8 +365,9 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
const auto& nss = coll.getNss();
LOGV2_DEBUG(5494505, 2, "Dropping collection", logAttrs(nss));
+ _updateSession(opCtx);
sharding_ddl_util::stopMigrations(
- opCtx, nss, coll.getUuid(), getNewSession(opCtx));
+ opCtx, nss, coll.getUuid(), getCurrentSession());
auto newStateDoc = _doc;
newStateDoc.setCollInfo(coll);
@@ -376,8 +384,9 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
const auto& nssWithZones =
catalogClient->getAllNssThatHaveZonesForDatabase(opCtx, _dbName);
for (const auto& nss : nssWithZones) {
+ _updateSession(opCtx);
sharding_ddl_util::removeTagsMetadataFromConfig(
- opCtx, nss, getNewSession(opCtx));
+ opCtx, nss, getCurrentSession());
}
// Remove the query sampling configuration documents for all collections in this
@@ -461,12 +470,13 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
_clearDatabaseInfoOnPrimary(opCtx);
_clearDatabaseInfoOnSecondaries(opCtx);
+ _updateSession(opCtx);
removeDatabaseFromConfigAndUpdatePlacementHistory(
opCtx,
**executor,
_dbName,
*metadata().getDatabaseVersion(),
- getNewSession(opCtx));
+ getCurrentSession());
VectorClockMutable::get(opCtx)->waitForDurableConfigTime().get(opCtx);
}
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp
index ec14848dfdd..8dd593b44e7 100644
--- a/src/mongo/db/s/rename_collection_coordinator.cpp
+++ b/src/mongo/db/s/rename_collection_coordinator.cpp
@@ -654,13 +654,15 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
// Block migrations on involved sharded collections
if (_doc.getOptShardedCollInfo()) {
+ _updateSession(opCtx);
sharding_ddl_util::stopMigrations(
- opCtx, fromNss, _doc.getSourceUUID(), getNewSession(opCtx));
+ opCtx, fromNss, _doc.getSourceUUID(), getCurrentSession());
}
if (_doc.getTargetIsSharded()) {
+ _updateSession(opCtx);
sharding_ddl_util::stopMigrations(
- opCtx, toNss, _doc.getTargetUUID(), getNewSession(opCtx));
+ opCtx, toNss, _doc.getTargetUUID(), getCurrentSession());
}
}))
.then(_buildPhaseHandler(
@@ -671,12 +673,16 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
getForwardableOpMetadata().setOn(opCtx);
if (!_firstExecution) {
+ _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getNewSession(opCtx), **executor);
+ opCtx, getCurrentSession(), **executor);
}
const auto& fromNss = nss();
+ _updateSession(opCtx);
+ const OperationSessionInfo osi = getCurrentSession();
+
// On participant shards:
// - Block CRUD on source and target collection in case at least one of such
// collections is currently sharded
@@ -689,7 +695,7 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
renameCollParticipantRequest.setRenameCollectionRequest(_request);
const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(
renameCollParticipantRequest.toBSON({}))
- .addFields(getNewSession(opCtx).toBSON());
+ .addFields(osi.toBSON());
// We need to send the command to all the shards because both movePrimary and
// moveChunk leave garbage behind for sharded collections. At the same time, the
@@ -726,23 +732,27 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
// For an unsharded collection the CSRS server can not verify the targetUUID.
// Use the session ID + txnNumber to ensure no stale requests get through.
+ _updateSession(opCtx);
+
if (!_firstExecution) {
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getNewSession(opCtx), **executor);
+ opCtx, getCurrentSession(), **executor);
}
if ((_doc.getTargetIsSharded() || _doc.getOptShardedCollInfo())) {
renameIndexMetadataInShards(
- opCtx, nss(), _request, getNewSession(opCtx), **executor, &_doc);
+ opCtx, nss(), _request, getCurrentSession(), **executor, &_doc);
}
+ _updateSession(opCtx);
+
renameCollectionMetadataInTransaction(opCtx,
_doc.getOptShardedCollInfo(),
_request.getTo(),
_doc.getTargetUUID(),
ShardingCatalogClient::kMajorityWriteConcern,
**executor,
- getNewSession(opCtx));
+ getCurrentSession());
}))
.then(_buildPhaseHandler(
Phase::kUnblockCRUD,
@@ -752,8 +762,9 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
getForwardableOpMetadata().setOn(opCtx);
if (!_firstExecution) {
+ _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getNewSession(opCtx), **executor);
+ opCtx, getCurrentSession(), **executor);
}
const auto& fromNss = nss();
@@ -767,12 +778,11 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
unblockParticipantRequest.toBSON({}));
auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
+ _updateSession(opCtx);
+ const OperationSessionInfo osi = getCurrentSession();
+
sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx,
- fromNss.db(),
- cmdObj.addFields(getNewSession(opCtx).toBSON()),
- participants,
- **executor);
+ opCtx, fromNss.db(), cmdObj.addFields(osi.toBSON()), participants, **executor);
// Delete chunks belonging to the previous incarnation of the target collection.
// This is performed after releasing the critical section in order to reduce stalls
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h
index 267b3161b04..db0827d1142 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator.h
@@ -372,47 +372,6 @@ protected:
}
}
- /**
- * Advances and persists the `txnNumber` to ensure causality between requests, then returns the
- * updated operation session information (OSI).
- */
- OperationSessionInfo getNewSession(OperationContext* opCtx) {
- _updateSession(opCtx);
- return getCurrentSession();
- }
-
- virtual boost::optional<Status> getAbortReason() const override {
- const auto& status = _doc.getAbortReason();
- invariant(!status || !status->isOK(), "when persisted, status must be an error");
- return status;
- }
-
- /**
- * Persists the abort reason and throws it as an exception. This causes the coordinator to fail,
- * and triggers the cleanup future chain since there is a the persisted reason.
- */
- void triggerCleanup(OperationContext* opCtx, const Status& status) {
- LOGV2_INFO(7418502,
- "Coordinator failed, persisting abort reason",
- "coordinatorId"_attr = _doc.getId(),
- "phase"_attr = serializePhase(_doc.getPhase()),
- "reason"_attr = redact(status));
-
- auto newDoc = [&] {
- stdx::lock_guard lk{_docMutex};
- return _doc;
- }();
-
- auto coordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata();
- coordinatorMetadata.setAbortReason(status);
- newDoc.setShardingDDLCoordinatorMetadata(std::move(coordinatorMetadata));
-
- _updateStateDocument(opCtx, std::move(newDoc));
-
- uassertStatusOK(status);
- }
-
-private:
// lazily acquire Logical Session ID and a txn number
void _updateSession(OperationContext* opCtx) {
auto newDoc = [&] {
@@ -449,6 +408,42 @@ private:
osi.setTxnNumber(optSession->getTxnNumber());
return osi;
}
+
+ OperationSessionInfo getNewSession(OperationContext* opCtx) {
+ _updateSession(opCtx);
+ return getCurrentSession();
+ }
+
+ virtual boost::optional<Status> getAbortReason() const override {
+ const auto& status = _doc.getAbortReason();
+ invariant(!status || !status->isOK(), "when persisted, status must be an error");
+ return status;
+ }
+
+ /**
+ * Persists the abort reason and throws it as an exception. This causes the coordinator to fail,
+ * and triggers the cleanup future chain since there is a the persisted reason.
+ */
+ void triggerCleanup(OperationContext* opCtx, const Status& status) {
+ LOGV2_INFO(7418502,
+ "Coordinator failed, persisting abort reason",
+ "coordinatorId"_attr = _doc.getId(),
+ "phase"_attr = serializePhase(_doc.getPhase()),
+ "reason"_attr = redact(status));
+
+ auto newDoc = [&] {
+ stdx::lock_guard lk{_docMutex};
+ return _doc;
+ }();
+
+ auto coordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata();
+ coordinatorMetadata.setAbortReason(status);
+ newDoc.setShardingDDLCoordinatorMetadata(std::move(coordinatorMetadata));
+
+ _updateStateDocument(opCtx, std::move(newDoc));
+
+ uassertStatusOK(status);
+ }
};
#undef MONGO_LOGV2_DEFAULT_COMPONENT