diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2021-05-24 08:48:47 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-15 22:28:42 +0000 |
commit | 23ecc48f89f4ec03d7b42e637c5969802efdb261 (patch) | |
tree | a329aa3d69786ad33a83fe3c49faa07ad87d082c /src/mongo | |
parent | 44c0877706adbb32ba0d2c0ac7e2bbfe0e6abf0d (diff) | |
download | mongo-23ecc48f89f4ec03d7b42e637c5969802efdb261.tar.gz |
SERVER-56763 Move $merge target collection epoch validation outside of
parsing
This allows us to consult and refresh the catalog cache without holding locks
Diffstat (limited to 'src/mongo')
3 files changed, 17 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index 54e9620793f..a3623fca678 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -140,6 +140,18 @@ public: return _pipeline; } + void initialize() override { + // This implies that the stage will soon start to write, so it's safe to verify the target + // collection version. This is done here instead of parse time since it requires that locks + // are not held. + if (!pExpCtx->inMongos && _targetCollectionVersion) { + // If mongos has sent us a target shard version, we need to be sure we are prepared to + // act as a router which is at least as recent as that mongos. + pExpCtx->mongoProcessInterface->checkRoutingInfoEpochOrThrow( + pExpCtx, getOutputNs(), *_targetCollectionVersion); + } + } + private: /** * Builds a new $merge stage which will merge all documents into 'outputNs'. If diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index 9ca6c92a5c5..c684ea98e33 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -546,12 +546,9 @@ CommonMongodProcessInterface::ensureFieldsUniqueOrResolveDocumentKey( boost::optional<std::set<FieldPath>> fieldPaths, boost::optional<ChunkVersion> targetCollectionVersion, const NamespaceString& outputNs) const { - if (targetCollectionVersion) { - uassert(51123, "Unexpected target chunk version specified", expCtx->fromMongos); - // If mongos has sent us a target shard version, we need to be sure we are prepared to - // act as a router which is at least as recent as that mongos. - checkRoutingInfoEpochOrThrow(expCtx, outputNs, *targetCollectionVersion); - } + uassert(51123, + "Unexpected target chunk version specified", + !targetCollectionVersion || expCtx->fromMongos); if (!fieldPaths) { uassert(51124, "Expected fields to be provided from mongos", !expCtx->fromMongos); diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 8eccb087f88..1fc0dc6b80d 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -73,6 +73,7 @@ void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, ChunkVersion targetCollectionVersion) const { + auto const shardId = ShardingState::get(expCtx->opCtx)->shardId(); auto* catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); @@ -81,11 +82,8 @@ void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow( catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( nss, targetCollectionVersion, shardId); - // This will throw a 'ShardCannotRefreshDueToLocksHeldInfo' exception if the cache entry is - // staler than 'targetCollectionVersion' and 'checkRoutingInfoEpochOrThrow' is called under a DB - // lock. const auto routingInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(expCtx->opCtx, nss, true)); + uassertStatusOK(catalogCache->getCollectionRoutingInfo(expCtx->opCtx, nss)); auto foundVersion = routingInfo.getVersion(shardId); uassert(StaleEpochInfo(nss), |