diff options
Diffstat (limited to 'src/mongo/db/pipeline')
11 files changed, 104 insertions, 33 deletions
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index a84d1773a1f..5f6214400df 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -48,14 +48,8 @@ namespace mongo { namespace { -void assertIsValidCollectionState(const boost::intrusive_ptr<ExpressionContext>& expCtx) { - if (expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, expCtx->ns)) { - const bool foreignShardedAllowed = - getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); - if (!foreignShardedAllowed) { - uasserted(31428, "Cannot run $graphLookup with sharded foreign collection"); - } - } +bool foreignShardedLookupAllowed() { + return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); } } // namespace @@ -187,8 +181,12 @@ void DocumentSourceGraphLookUp::doDispose() { void DocumentSourceGraphLookUp::doBreadthFirstSearch() { long long depth = 0; bool shouldPerformAnotherQuery; - assertIsValidCollectionState(_fromExpCtx); do { + if (!foreignShardedLookupAllowed()) { + // Enforce that the foreign collection must be unsharded for $graphLookup. + _fromExpCtx->mongoProcessInterface->setExpectedShardVersion( + _fromExpCtx->opCtx, _fromExpCtx->ns, ChunkVersion::UNSHARDED()); + } shouldPerformAnotherQuery = false; // Check whether each key in the frontier exists in the cache or needs to be queried. @@ -359,7 +357,19 @@ void DocumentSourceGraphLookUp::performSearch() { _frontierUsageBytes += startingValue.getApproximateSize(); } - doBreadthFirstSearch(); + try { + doBreadthFirstSearch(); + } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) { + // If lookup on a sharded collection is disallowed and the foreign collection is sharded, + // throw a custom exception. + if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { + uassert(31428, + "Cannot run $graphLookup with sharded foreign collection", + foreignShardedLookupAllowed() || !staleInfo->getVersionWanted() || + staleInfo->getVersionWanted() == ChunkVersion::UNSHARDED()); + } + throw; + } } DocumentSource::GetModPathsReturn DocumentSourceGraphLookUp::getModifiedPaths() const { diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index af45df02f2f..95c33761454 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -234,16 +234,6 @@ BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& valu return orBuilder.obj(); } -void assertIsValidCollectionState(const boost::intrusive_ptr<ExpressionContext>& expCtx) { - if (expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, expCtx->ns)) { - const bool foreignShardedAllowed = - getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); - if (!foreignShardedAllowed) { - uasserted(51069, "Cannot run $lookup with sharded foreign collection"); - } - } -} - void lookupPipeValidator(const Pipeline& pipeline) { const auto& sources = pipeline.getSources(); std::for_each(sources.begin(), sources.end(), [](auto& src) { @@ -253,6 +243,10 @@ void lookupPipeValidator(const Pipeline& pipeline) { src->constraints().isAllowedInLookupPipeline()); }); } + +bool foreignShardedLookupAllowed() { + return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); +} } // namespace DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() { @@ -278,7 +272,20 @@ DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() { _resolvedPipeline.back() = matchStage; } - auto pipeline = buildPipeline(inputDoc); + std::unique_ptr<Pipeline, PipelineDeleter> pipeline; + try { + pipeline = buildPipeline(inputDoc); + } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) { + // If lookup on a sharded collection is disallowed and the foreign collection is sharded, + // throw a custom exception. + if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { + uassert(51069, + "Cannot run $lookup with sharded foreign collection", + foreignShardedLookupAllowed() || !staleInfo->getVersionWanted() || + staleInfo->getVersionWanted() == ChunkVersion::UNSHARDED()); + } + throw; + } std::vector<Value> results; long long objsize = 0; @@ -308,11 +315,15 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( // Copy all 'let' variables into the foreign pipeline's expression context. _variables.copyToExpCtx(_variablesParseState, _fromExpCtx.get()); - assertIsValidCollectionState(_fromExpCtx); - // Resolve the 'let' variables to values per the given input document. resolveLetVariables(inputDoc, &_fromExpCtx->variables); + if (!foreignShardedLookupAllowed()) { + // Enforce that the foreign collection must be unsharded for lookup. + _fromExpCtx->mongoProcessInterface->setExpectedShardVersion( + _fromExpCtx->opCtx, _fromExpCtx->ns, ChunkVersion::UNSHARDED()); + } + // If we don't have a cache, build and return the pipeline immediately. if (!_cache || _cache->isAbandoned()) { MakePipelineOptions pipelineOpts; diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 256f23f01a2..2dd399d6fce 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -477,7 +477,8 @@ StageConstraints DocumentSourceMerge::constraints(Pipeline::SplitState pipeState // either choice will work correctly, we are simply applying a heuristic optimization. return {StreamType::kStreaming, PositionRequirement::kLast, - pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs) + pExpCtx->inMongos && + pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs) ? HostTypeRequirement::kAnyShard : HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kWritesPersistentData, diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index d99123a1404..43915889af4 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -2622,6 +2622,7 @@ class MergeWithShardedCollection : public ShardMergerBase { }; auto expCtx = ShardMergerBase::createExpressionContext(request); + expCtx->inMongos = true; expCtx->mongoProcessInterface = std::make_shared<ProcessInterface>(); return expCtx; } diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index f7d8f8e0a2e..440648566a6 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -418,6 +418,16 @@ public: const NamespaceString& nss, ChunkVersion targetCollectionVersion) const = 0; + /** + * Sets the expected shard version for the given namespace. Invariants if the caller attempts to + * change an existing shard version, or if the shard version for this namespace has already been + * checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the + * constraint that the foreign collection must be unsharded. + */ + virtual void setExpectedShardVersion(OperationContext* opCtx, + const NamespaceString& nss, + boost::optional<ChunkVersion> chunkVersion) = 0; + virtual std::unique_ptr<ResourceYielder> getResourceYielder() const = 0; /** diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index cce89f5177d..72868642cf5 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -278,8 +278,9 @@ std::vector<GenericCursor> MongosProcessInterface::getIdleCursors( } bool MongosProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - auto routingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); - return routingInfo.isOK() && routingInfo.getValue().cm(); + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + return static_cast<bool>(routingInfo.cm()); } bool MongosProcessInterface::fieldsHaveSupportingUniqueIndex( diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 25f5dfa2f9c..8ce9c181fde 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -211,6 +211,12 @@ public: MONGO_UNREACHABLE; } + void setExpectedShardVersion(OperationContext* opCtx, + const NamespaceString& nss, + boost::optional<ChunkVersion> chunkVersion) override { + MONGO_UNREACHABLE; + } + std::unique_ptr<ResourceYielder> getResourceYielder() const override { return nullptr; } diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index 0e8422d9e8e..fcb015c25b4 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -111,6 +111,12 @@ public: const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) override; + void setExpectedShardVersion(OperationContext* opCtx, + const NamespaceString& nss, + boost::optional<ChunkVersion> chunkVersion) override { + // Do nothing on a non-shardsvr mongoD. + } + protected: // This constructor is marked as protected in order to prevent instantiation since this // interface is designed to have a concrete process interface for each possible diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index f97b06e49b3..1e8cfcd38fc 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -42,6 +42,7 @@ #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/cluster_commands_helpers.h" @@ -53,11 +54,9 @@ namespace mongo { using namespace fmt::literals; bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); - Lock::CollectionLock collLock(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss) - ->getCollectionDescription_DEPRECATED() - .isSharded(); + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + return static_cast<bool>(routingInfo.cm()); } void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow( @@ -350,4 +349,17 @@ ShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipelin return sharded_agg_helpers::attachCursorToPipeline(ownedPipeline, allowTargetingShards); } +void ShardServerProcessInterface::setExpectedShardVersion( + OperationContext* opCtx, + const NamespaceString& nss, + boost::optional<ChunkVersion> chunkVersion) { + auto& oss = OperationShardingState::get(opCtx); + if (oss.hasShardVersion(nss)) { + invariant(oss.getShardVersion(nss) == chunkVersion); + } else { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + nss, chunkVersion, boost::none); + } +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index 76cf5887fbb..6b0ecc26bea 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -43,10 +43,13 @@ public: using CommonMongodProcessInterface::CommonMongodProcessInterface; /** - * Note: Information returned can be stale. Caller should always attach shardVersion when - * sending request against nss based on this information. + * Note: Cannot be called while holding a lock. Refreshes from the config servers if the + * metadata for the given namespace does not exist. Otherwise, will not automatically refresh, + * so the answer may be stale or become stale after calling. Caller should always attach + * shardVersion when sending request against nss based on this information. */ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; + void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, ChunkVersion targetCollectionVersion) const final; @@ -119,6 +122,10 @@ public: */ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( Pipeline* pipeline, bool allowTargetingShards) final; + + void setExpectedShardVersion(OperationContext* opCtx, + const NamespaceString& nss, + boost::optional<ChunkVersion> chunkVersion) final; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 7ebd3beba13..9648cce4bdc 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -259,5 +259,11 @@ public: return {*fieldPaths, targetCollectionVersion}; } + + void setExpectedShardVersion(OperationContext* opCtx, + const NamespaceString& nss, + boost::optional<ChunkVersion> chunkVersion) override { + // Do nothing. + } }; } // namespace mongo |