summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2020-06-16 16:14:07 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-06-29 19:18:43 +0000
commit639610456840a2c0d5c140b82fbad0d796734a92 (patch)
tree30a3800caaf3ec204104f2b9d97fbced74c83fde /src/mongo/db/pipeline
parent29fe8825ba79dc289694787169a3793fa6556bce (diff)
downloadmongo-639610456840a2c0d5c140b82fbad0d796734a92.tar.gz
SERVER-47532 ShardServerProcessInterface: Convert usage of getCollectionDescription_DEPRECATED to getCollectionDescription
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp30
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp37
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp3
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp1
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h10
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp5
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp22
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h11
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h6
11 files changed, 104 insertions, 33 deletions
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index a84d1773a1f..5f6214400df 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -48,14 +48,8 @@
namespace mongo {
namespace {
-void assertIsValidCollectionState(const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- if (expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, expCtx->ns)) {
- const bool foreignShardedAllowed =
- getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
- if (!foreignShardedAllowed) {
- uasserted(31428, "Cannot run $graphLookup with sharded foreign collection");
- }
- }
+bool foreignShardedLookupAllowed() {
+ return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
}
} // namespace
@@ -187,8 +181,12 @@ void DocumentSourceGraphLookUp::doDispose() {
void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
long long depth = 0;
bool shouldPerformAnotherQuery;
- assertIsValidCollectionState(_fromExpCtx);
do {
+ if (!foreignShardedLookupAllowed()) {
+ // Enforce that the foreign collection must be unsharded for $graphLookup.
+ _fromExpCtx->mongoProcessInterface->setExpectedShardVersion(
+ _fromExpCtx->opCtx, _fromExpCtx->ns, ChunkVersion::UNSHARDED());
+ }
shouldPerformAnotherQuery = false;
// Check whether each key in the frontier exists in the cache or needs to be queried.
@@ -359,7 +357,19 @@ void DocumentSourceGraphLookUp::performSearch() {
_frontierUsageBytes += startingValue.getApproximateSize();
}
- doBreadthFirstSearch();
+ try {
+ doBreadthFirstSearch();
+ } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) {
+ // If lookup on a sharded collection is disallowed and the foreign collection is sharded,
+ // throw a custom exception.
+ if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
+ uassert(31428,
+ "Cannot run $graphLookup with sharded foreign collection",
+ foreignShardedLookupAllowed() || !staleInfo->getVersionWanted() ||
+ staleInfo->getVersionWanted() == ChunkVersion::UNSHARDED());
+ }
+ throw;
+ }
}
DocumentSource::GetModPathsReturn DocumentSourceGraphLookUp::getModifiedPaths() const {
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index af45df02f2f..95c33761454 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -234,16 +234,6 @@ BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& valu
return orBuilder.obj();
}
-void assertIsValidCollectionState(const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- if (expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, expCtx->ns)) {
- const bool foreignShardedAllowed =
- getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
- if (!foreignShardedAllowed) {
- uasserted(51069, "Cannot run $lookup with sharded foreign collection");
- }
- }
-}
-
void lookupPipeValidator(const Pipeline& pipeline) {
const auto& sources = pipeline.getSources();
std::for_each(sources.begin(), sources.end(), [](auto& src) {
@@ -253,6 +243,10 @@ void lookupPipeValidator(const Pipeline& pipeline) {
src->constraints().isAllowedInLookupPipeline());
});
}
+
+bool foreignShardedLookupAllowed() {
+ return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
+}
} // namespace
DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() {
@@ -278,7 +272,20 @@ DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() {
_resolvedPipeline.back() = matchStage;
}
- auto pipeline = buildPipeline(inputDoc);
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
+ try {
+ pipeline = buildPipeline(inputDoc);
+ } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) {
+ // If lookup on a sharded collection is disallowed and the foreign collection is sharded,
+ // throw a custom exception.
+ if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
+ uassert(51069,
+ "Cannot run $lookup with sharded foreign collection",
+ foreignShardedLookupAllowed() || !staleInfo->getVersionWanted() ||
+ staleInfo->getVersionWanted() == ChunkVersion::UNSHARDED());
+ }
+ throw;
+ }
std::vector<Value> results;
long long objsize = 0;
@@ -308,11 +315,15 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
// Copy all 'let' variables into the foreign pipeline's expression context.
_variables.copyToExpCtx(_variablesParseState, _fromExpCtx.get());
- assertIsValidCollectionState(_fromExpCtx);
-
// Resolve the 'let' variables to values per the given input document.
resolveLetVariables(inputDoc, &_fromExpCtx->variables);
+ if (!foreignShardedLookupAllowed()) {
+ // Enforce that the foreign collection must be unsharded for lookup.
+ _fromExpCtx->mongoProcessInterface->setExpectedShardVersion(
+ _fromExpCtx->opCtx, _fromExpCtx->ns, ChunkVersion::UNSHARDED());
+ }
+
// If we don't have a cache, build and return the pipeline immediately.
if (!_cache || _cache->isAbandoned()) {
MakePipelineOptions pipelineOpts;
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 256f23f01a2..2dd399d6fce 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -477,7 +477,8 @@ StageConstraints DocumentSourceMerge::constraints(Pipeline::SplitState pipeState
// either choice will work correctly, we are simply applying a heuristic optimization.
return {StreamType::kStreaming,
PositionRequirement::kLast,
- pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)
+ pExpCtx->inMongos &&
+ pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)
? HostTypeRequirement::kAnyShard
: HostTypeRequirement::kPrimaryShard,
DiskUseRequirement::kWritesPersistentData,
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index d99123a1404..43915889af4 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -2622,6 +2622,7 @@ class MergeWithShardedCollection : public ShardMergerBase {
};
auto expCtx = ShardMergerBase::createExpressionContext(request);
+ expCtx->inMongos = true;
expCtx->mongoProcessInterface = std::make_shared<ProcessInterface>();
return expCtx;
}
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index f7d8f8e0a2e..440648566a6 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -418,6 +418,16 @@ public:
const NamespaceString& nss,
ChunkVersion targetCollectionVersion) const = 0;
+ /**
+ * Sets the expected shard version for the given namespace. Invariants if the caller attempts to
+ * change an existing shard version, or if the shard version for this namespace has already been
+ * checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the
+ * constraint that the foreign collection must be unsharded.
+ */
+ virtual void setExpectedShardVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<ChunkVersion> chunkVersion) = 0;
+
virtual std::unique_ptr<ResourceYielder> getResourceYielder() const = 0;
/**
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index cce89f5177d..72868642cf5 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -278,8 +278,9 @@ std::vector<GenericCursor> MongosProcessInterface::getIdleCursors(
}
bool MongosProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- auto routingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
- return routingInfo.isOK() && routingInfo.getValue().cm();
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ return static_cast<bool>(routingInfo.cm());
}
bool MongosProcessInterface::fieldsHaveSupportingUniqueIndex(
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 25f5dfa2f9c..8ce9c181fde 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -211,6 +211,12 @@ public:
MONGO_UNREACHABLE;
}
+ void setExpectedShardVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<ChunkVersion> chunkVersion) override {
+ MONGO_UNREACHABLE;
+ }
+
std::unique_ptr<ResourceYielder> getResourceYielder() const override {
return nullptr;
}
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
index 0e8422d9e8e..fcb015c25b4 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
@@ -111,6 +111,12 @@ public:
const NamespaceString& ns,
const std::vector<BSONObj>& indexSpecs) override;
+ void setExpectedShardVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<ChunkVersion> chunkVersion) override {
+ // Do nothing on a non-shardsvr mongoD.
+ }
+
protected:
// This constructor is marked as protected in order to prevent instantiation since this
// interface is designed to have a concrete process interface for each possible
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index f97b06e49b3..1e8cfcd38fc 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/cluster_commands_helpers.h"
@@ -53,11 +54,9 @@ namespace mongo {
using namespace fmt::literals;
bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS);
- Lock::CollectionLock collLock(opCtx, nss, MODE_IS);
- return CollectionShardingState::get(opCtx, nss)
- ->getCollectionDescription_DEPRECATED()
- .isSharded();
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ return static_cast<bool>(routingInfo.cm());
}
void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow(
@@ -350,4 +349,17 @@ ShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipelin
return sharded_agg_helpers::attachCursorToPipeline(ownedPipeline, allowTargetingShards);
}
+void ShardServerProcessInterface::setExpectedShardVersion(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<ChunkVersion> chunkVersion) {
+ auto& oss = OperationShardingState::get(opCtx);
+ if (oss.hasShardVersion(nss)) {
+ invariant(oss.getShardVersion(nss) == chunkVersion);
+ } else {
+ OperationShardingState::get(opCtx).initializeClientRoutingVersions(
+ nss, chunkVersion, boost::none);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index 76cf5887fbb..6b0ecc26bea 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -43,10 +43,13 @@ public:
using CommonMongodProcessInterface::CommonMongodProcessInterface;
/**
- * Note: Information returned can be stale. Caller should always attach shardVersion when
- * sending request against nss based on this information.
+ * Note: Cannot be called while holding a lock. Refreshes from the config servers if the
+ * metadata for the given namespace does not exist. Otherwise, will not automatically refresh,
+ * so the answer may be stale or become stale after calling. Caller should always attach
+ * shardVersion when sending request against nss based on this information.
*/
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
+
void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
ChunkVersion targetCollectionVersion) const final;
@@ -119,6 +122,10 @@ public:
*/
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
Pipeline* pipeline, bool allowTargetingShards) final;
+
+ void setExpectedShardVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<ChunkVersion> chunkVersion) final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 7ebd3beba13..9648cce4bdc 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -259,5 +259,11 @@ public:
return {*fieldPaths, targetCollectionVersion};
}
+
+ void setExpectedShardVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<ChunkVersion> chunkVersion) override {
+ // Do nothing.
+ }
};
} // namespace mongo