summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-10-26 00:42:14 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-10-26 04:28:23 +0100
commit81c5724d9d11a1ff2b16937dc01e6439d9e0f266 (patch)
tree4935dcc0cd22ec06a810d3e28a48b6d2cdf8a53d
parentb5c44441b27e5e4cda03d1ab5ca1f0af07b00835 (diff)
downloadmongo-81c5724d9d11a1ff2b16937dc01e6439d9e0f266.tar.gz
SERVER-31597 Refactor $changeStream post-update lookup
-rw-r--r--src/mongo/db/pipeline/document_source.h9
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp38
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp26
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp23
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h5
-rw-r--r--src/mongo/s/commands/SConscript1
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp212
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp260
-rw-r--r--src/mongo/s/commands/pipeline_s.h50
9 files changed, 386 insertions, 238 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 6b3754f5b8e..8e4278f040b 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -839,6 +839,15 @@ public:
*/
virtual std::vector<FieldPath> collectDocumentKeyFields(UUID) const = 0;
+ /**
+ * Returns zero or one documents matching the input filter, or throws if more than one match
+ * was found. The passed ExpressionContext may use a different namespace than the
+ * ExpressionContext used to construct the MongoProcessInterface. Returns boost::none if no
+ * matching documents were found, including cases where the given namespace does not exist.
+ */
+ virtual boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, const Document& filter) = 0;
+
// Add new methods as needed.
};
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
index 9535ad95bb2..058a959f588 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
@@ -68,6 +68,9 @@ DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::getNext() {
return input;
}
+ // Temporarily remove any deadline from this operation to avoid timeout during lookup.
+ OperationContext::DeadlineStash deadlineStash(pExpCtx->opCtx);
+
MutableDocument output(input.releaseDocument());
output[kFullDocumentFieldName] = lookupPostImage(output.peek());
return output.freeze();
@@ -93,9 +96,10 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat
// Make sure we have a well-formed input.
auto nss = assertNamespaceMatches(updateOp);
- auto documentKey = assertFieldHasType(
- updateOp, DocumentSourceChangeStream::kDocumentKeyField, BSONType::Object);
- auto matchSpec = BSON("$match" << documentKey);
+ auto documentKey = assertFieldHasType(updateOp,
+ DocumentSourceChangeStream::kDocumentKeyField,
+ BSONType::Object)
+ .getDocument();
// Extract the UUID from resume token and do change stream lookups by UUID.
auto resumeToken =
@@ -104,29 +108,11 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat
// TODO SERVER-29134 we need to extract the namespace from the document and set them on the new
// ExpressionContext if we're getting notifications from an entire database.
auto foreignExpCtx = pExpCtx->copyWith(nss, resumeToken.getData().uuid);
- auto pipelineStatus = _mongoProcessInterface->makePipeline({matchSpec}, foreignExpCtx);
- if (pipelineStatus.getStatus() == ErrorCodes::NamespaceNotFound) {
- // We couldn't find the collection with UUID, it may have been dropped.
- return Value(BSONNULL);
- }
- auto pipeline = uassertStatusOK(std::move(pipelineStatus));
-
- if (auto first = pipeline->getNext()) {
- auto lookedUpDocument = Value(*first);
- if (auto next = pipeline->getNext()) {
- uasserted(40580,
- str::stream() << "found more than document with documentKey "
- << documentKey.toString()
- << " while looking up post image after change: ["
- << lookedUpDocument.toString()
- << ", "
- << next->toString()
- << "]");
- }
- return lookedUpDocument;
- }
- // We couldn't find it with the documentKey, it may have been deleted.
- return Value(BSONNULL);
+ auto lookedUpDoc = _mongoProcessInterface->lookupSingleDocument(foreignExpCtx, documentKey);
+
+ // Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may
+ // not have returned any results if the document was deleted in the time since the update op.
+ return (lookedUpDoc ? Value(*lookedUpDoc) : Value(BSONNULL));
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index d9db1db38ce..05b92297359 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -90,7 +90,7 @@ public:
StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) final {
+ const MakePipelineOptions opts = MakePipelineOptions{}) final {
auto pipeline = Pipeline::parse(rawPipeline, expCtx);
if (!pipeline.isOK()) {
return pipeline.getStatus();
@@ -113,6 +113,27 @@ public:
return Status::OK();
}
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, const Document& filter) {
+ auto swPipeline = makePipeline({BSON("$match" << filter)}, expCtx);
+ if (swPipeline == ErrorCodes::NamespaceNotFound) {
+ return boost::none;
+ }
+ auto pipeline = uassertStatusOK(std::move(swPipeline));
+
+ auto lookedUpDocument = pipeline->getNext();
+ if (auto next = pipeline->getNext()) {
+ uasserted(ErrorCodes::TooManyMatchingDocuments,
+ str::stream() << "found more than one document matching " << filter.toString()
+ << " ["
+ << (*lookedUpDocument).toString()
+ << ", "
+ << (*next).toString()
+ << "]");
+ }
+ return lookedUpDocument;
+ }
+
private:
deque<DocumentSource::GetNextResult> _mockResults;
};
@@ -248,7 +269,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni
lookupChangeStage->injectMongoProcessInterface(
std::make_shared<MockMongoProcessInterface>(std::move(foreignCollection)));
- ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40580);
+ ASSERT_THROWS_CODE(
+ lookupChangeStage->getNext(), AssertionException, ErrorCodes::TooManyMatchingDocuments);
}
TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) {
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index d36b047af68..932b3e4f3e4 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -200,7 +200,7 @@ public:
StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) final {
+ const MakePipelineOptions opts = MakePipelineOptions{}) final {
// 'expCtx' may represent the settings for an aggregation pipeline on a different namespace
// than the DocumentSource this MongodProcessInterface is injected into, but both
// ExpressionContext instances should still have the same OperationContext.
@@ -381,6 +381,27 @@ public:
return result;
}
+ boost::optional<Document> lookupSingleDocument(const intrusive_ptr<ExpressionContext>& expCtx,
+ const Document& filter) final {
+ auto swPipeline = makePipeline({BSON("$match" << filter)}, expCtx);
+ if (swPipeline == ErrorCodes::NamespaceNotFound) {
+ return boost::none;
+ }
+ auto pipeline = uassertStatusOK(std::move(swPipeline));
+
+ auto lookedUpDocument = pipeline->getNext();
+ if (auto next = pipeline->getNext()) {
+ uasserted(ErrorCodes::TooManyMatchingDocuments,
+ str::stream() << "found more than one document matching " << filter.toString()
+ << " ["
+ << lookedUpDocument->toString()
+ << ", "
+ << next->toString()
+ << "]");
+ }
+ return lookedUpDocument;
+ }
+
private:
intrusive_ptr<ExpressionContext> _ctx;
DBDirectClient _client;
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 1a1a72f1224..06e22b24bdd 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -117,5 +117,10 @@ public:
std::vector<FieldPath> collectDocumentKeyFields(UUID) const override {
MONGO_UNREACHABLE;
}
+
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, const Document& filter) {
+ MONGO_UNREACHABLE;
+ }
};
} // namespace mongo
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 68660c0b42f..23fe7a7a7d6 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -79,6 +79,7 @@ env.Library(
'cluster_write_cmd.cpp',
'commands_public.cpp',
'kill_sessions_remote.cpp',
+ 'pipeline_s.cpp',
'strategy.cpp',
env.Idlc('cluster_multicast.idl')[0],
],
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 602e3dac6b8..d4aac818133 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -54,16 +54,13 @@
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
+#include "mongo/s/commands/pipeline_s.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
-#include "mongo/s/query/document_source_router_adapter.h"
#include "mongo/s/query/establish_cursors.h"
-#include "mongo/s/query/router_exec_stage.h"
-#include "mongo/s/query/router_stage_merge.h"
-#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
@@ -71,104 +68,6 @@
namespace mongo {
namespace {
-
-/**
- * Class to provide access to mongos-specific implementations of methods required by some document
- * sources.
- */
-class MongosProcessInterface final
- : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface {
-public:
- MongosProcessInterface(OperationContext* opCtx) : _opCtx(opCtx) {}
-
- virtual ~MongosProcessInterface() = default;
-
- void setOperationContext(OperationContext* opCtx) override {
- _opCtx = opCtx;
- }
-
- DBClientBase* directClient() override {
- MONGO_UNREACHABLE;
- }
-
- bool isSharded(const NamespaceString& ns) override {
- MONGO_UNREACHABLE;
- }
-
- BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) override {
- MONGO_UNREACHABLE;
- }
-
- CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) override {
- MONGO_UNREACHABLE;
- }
-
- void appendLatencyStats(const NamespaceString& nss,
- bool includeHistograms,
- BSONObjBuilder* builder) const override {
- MONGO_UNREACHABLE;
- }
-
- Status appendStorageStats(const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const override {
- MONGO_UNREACHABLE;
- }
-
- Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const override {
- MONGO_UNREACHABLE;
- }
-
- BSONObj getCollectionOptions(const NamespaceString& nss) override {
- MONGO_UNREACHABLE;
- }
-
- Status renameIfOptionsAndIndexesHaveNotChanged(
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) override {
- MONGO_UNREACHABLE;
- }
-
- /**
- * Constructs an executable pipeline targeted to a remote shard. Returns
- * ErrorCodes::InternalError if 'rawPipeline' specifies a pipeline that does not target a single
- * shard.
- */
- StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions = {}) override;
-
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) override {
- MONGO_UNREACHABLE;
- }
-
- std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
- CurrentOpUserMode userMode,
- CurrentOpTruncateMode truncateMode) const override {
- MONGO_UNREACHABLE;
- }
-
- std::string getShardName(OperationContext* opCtx) const override {
- MONGO_UNREACHABLE;
- }
-
- std::vector<FieldPath> collectDocumentKeyFields(UUID) const override {
- MONGO_UNREACHABLE;
- }
-
-private:
- StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipelineWithOneRemote(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
- OperationContext* _opCtx;
-};
-
// Given a document representing an aggregation command such as
//
// {aggregate: "myCollection", pipeline: [], ...},
@@ -594,15 +493,8 @@ BSONObj establishMergingMongosCursor(
std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
// Inject the MongosProcessInterface for sources which need it.
- const auto& sources = pipelineForMerging->getSources();
- for (auto&& source : sources) {
- DocumentSourceNeedsMongoProcessInterface* needsMongoProcessInterface =
- dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get());
- if (needsMongoProcessInterface) {
- needsMongoProcessInterface->injectMongoProcessInterface(
- std::make_shared<MongosProcessInterface>(opCtx));
- }
- }
+ PipelineS::injectMongosInterface(pipelineForMerging.get());
+
ClusterClientCursorParams params(
requestedNss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
@@ -683,104 +575,6 @@ BSONObj establishMergingMongosCursor(
return cursorResponse.obj();
}
-/**
- * This is a special type of RouterExecStage that is used to iterate remote cursors that were
- * created internally and do not represent a client cursor, such as those used in $changeStream's
- * updateLookup functionality.
- *
- * The purpose of this class is to provide ownership over a ClusterClientCursorParams struct without
- * creating a ClusterClientCursor, which would show up in the server stats for this mongos.
- */
-class RouterStageInternalCursor final : public RouterExecStage {
-public:
- RouterStageInternalCursor(OperationContext* opCtx,
- std::unique_ptr<ClusterClientCursorParams>&& params,
- std::unique_ptr<RouterExecStage> child)
- : RouterExecStage(opCtx, std::move(child)), _params(std::move(params)) {}
-
- StatusWith<ClusterQueryResult> next(ExecContext execContext) {
- return getChildStage()->next(execContext);
- }
-
-private:
- std::unique_ptr<ClusterClientCursorParams> _params;
-};
-
-StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> MongosProcessInterface::makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) {
- // For the time being we don't expect any callers with options other than these.
- invariant(pipelineOptions.optimize);
- invariant(pipelineOptions.attachCursorSource);
- invariant(!pipelineOptions.forceInjectMongoProcessInterface);
-
- // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace
- // than the DocumentSource this MongodImplementation is injected into, but both
- // ExpressionContext instances should still have the same OperationContext.
- invariant(_opCtx == expCtx->opCtx);
-
- // Explain is not supported for auxiliary lookups.
- invariant(!expCtx->explain);
- return makePipelineWithOneRemote(rawPipeline, expCtx);
-}
-
-StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>>
-MongosProcessInterface::makePipelineWithOneRemote(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) {
-
- // Temporarily remove any deadline from this operation, we don't want to timeout while doing
- // this lookup.
- OperationContext::DeadlineStash deadlineStash(expCtx->opCtx);
-
- // Generate the command object for the targeted shards.
- AggregationRequest aggRequest(expCtx->ns, rawPipeline);
- LiteParsedPipeline liteParsedPipeline(aggRequest);
- auto parsedPipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!parsedPipeline.isOK()) {
- return parsedPipeline.getStatus();
- }
- parsedPipeline.getValue()->optimizePipeline();
-
- auto targetStatus = establishShardCursors(expCtx,
- expCtx->ns,
- aggRequest.serializeToCommandObj().toBson(),
- aggRequest,
- liteParsedPipeline,
- std::move(parsedPipeline.getValue()));
-
- if (!targetStatus.isOK()) {
- return targetStatus.getStatus();
- }
- auto targetingResults = std::move(targetStatus.getValue());
- if (targetingResults.remoteCursors.size() != 1) {
- return {ErrorCodes::InternalError,
- str::stream() << "Unable to target pipeline to single shard: "
- << Value(rawPipeline).toString()};
- }
- invariant(!targetingResults.pipelineForMerging);
-
- auto params = stdx::make_unique<ClusterClientCursorParams>(
- expCtx->ns,
- AuthorizationSession::get(expCtx->opCtx->getClient())->getAuthenticatedUserNames(),
- ReadPreferenceSetting::get(expCtx->opCtx));
- params->remotes = std::move(targetingResults.remoteCursors);
-
- // We will transfer ownership of the params to the RouterStageInternalCursor, but need a
- // reference to them to construct the RouterStageMerge.
- auto* unownedParams = params.get();
- auto mergeStage = stdx::make_unique<RouterStageMerge>(
- expCtx->opCtx,
- Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
- unownedParams);
- auto routerExecutionTree = stdx::make_unique<RouterStageInternalCursor>(
- expCtx->opCtx, std::move(params), std::move(mergeStage));
-
- return Pipeline::create(
- {DocumentSourceRouterAdapter::create(expCtx, std::move(routerExecutionTree))}, expCtx);
-}
-
} // namespace
Status ClusterAggregate::runAggregate(OperationContext* opCtx,
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
new file mode 100644
index 00000000000..67068ed12ff
--- /dev/null
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -0,0 +1,260 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/pipeline_s.h"
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/commands/cluster_commands_helpers.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/query/establish_cursors.h"
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+
+namespace {
+/**
+ * Determines the single shard to which the given query will be targeted, and its associated
+ * shardVersion. Throws if the query targets more than one shard.
+ */
+std::pair<ShardId, ChunkVersion> getSingleTargetedShardForQuery(
+ OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, BSONObj query) {
+ if (auto chunkMgr = routingInfo.cm()) {
+ std::set<ShardId> shardIds;
+ chunkMgr->getShardIdsForQuery(opCtx, query, BSONObj(), &shardIds);
+ uassert(ErrorCodes::InternalError,
+ str::stream() << "Unable to target lookup query to a single shard: "
+ << query.toString(),
+ shardIds.size() == 1u);
+ return {*shardIds.begin(), chunkMgr->getVersion(*shardIds.begin())};
+ }
+
+ return {routingInfo.primaryId(), ChunkVersion::UNSHARDED()};
+}
+
+/**
+ * Returns the routing information for the namespace set on the passed ExpressionContext. Also
+ * verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry.
+ */
+StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(
+ const intrusive_ptr<ExpressionContext>& expCtx) {
+ auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
+ auto swRoutingInfo = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns);
+ // Additionally check that the ExpressionContext's UUID matches the collection routing info.
+ if (swRoutingInfo.isOK() && expCtx->uuid && swRoutingInfo.getValue().cm()) {
+ if (!swRoutingInfo.getValue().cm()->uuidMatches(*expCtx->uuid)) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "The UUID of collection " << expCtx->ns.ns()
+ << " changed; it may have been dropped and re-created."};
+ }
+ }
+ return swRoutingInfo;
+}
+
+/**
+ * Class to provide access to mongos-specific implementations of methods required by some document
+ * sources.
+ */
+class MongosProcessInterface final
+ : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface {
+public:
+ MongosProcessInterface(const intrusive_ptr<ExpressionContext>& expCtx)
+ : _expCtx(expCtx), _opCtx(_expCtx->opCtx) {}
+
+ virtual ~MongosProcessInterface() = default;
+
+ void setOperationContext(OperationContext* opCtx) final {
+ invariant(_expCtx->opCtx == opCtx);
+ _opCtx = opCtx;
+ }
+
+ DBClientBase* directClient() final {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isSharded(const NamespaceString& ns) final {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final {
+ MONGO_UNREACHABLE;
+ }
+
+ CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void appendLatencyStats(const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status appendStorageStats(const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj getCollectionOptions(const NamespaceString& nss) final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status renameIfOptionsAndIndexesHaveNotChanged(
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) final {
+ MONGO_UNREACHABLE;
+ }
+
+ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode,
+ CurrentOpTruncateMode truncateMode) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::vector<FieldPath> collectDocumentKeyFields(UUID) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ boost::optional<Document> lookupSingleDocument(const intrusive_ptr<ExpressionContext>& expCtx,
+ const Document& filter) final {
+ // The passed ExpressionContext may use a different namespace than the _expCtx used to
+ // construct the MongosProcessInterface, but both should be using the same opCtx.
+ invariant(expCtx->opCtx == _expCtx->opCtx);
+
+ // Create a QueryRequest from the given filter. This will be used to generate the find
+ // command to be dispatched to the shard in order to return the post-change document.
+ auto filterObj = filter.toBson();
+ QueryRequest qr(expCtx->ns);
+ qr.setFilter(filterObj);
+
+ auto swShardResult = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
+ auto findCmd = qr.asFindCommand();
+ size_t numAttempts = 0;
+ do {
+ // Verify that the collection exists, with the UUID passed in the expCtx.
+ auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
+ auto swRoutingInfo = getCollectionRoutingInfo(expCtx);
+ if (swRoutingInfo == ErrorCodes::NamespaceNotFound) {
+ return boost::none;
+ }
+ auto routingInfo = uassertStatusOK(std::move(swRoutingInfo));
+
+ // Get the ID and version of the single shard to which this query will be sent.
+ auto shardInfo = getSingleTargetedShardForQuery(expCtx->opCtx, routingInfo, filterObj);
+
+ // Dispatch the request. This will only be sent to a single shard and only a single
+ // result will be returned. The 'establishCursors' method conveniently prepares the
+ // result into a cursor response for us.
+ swShardResult = establishCursors(
+ expCtx->opCtx,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ expCtx->ns,
+ ReadPreferenceSetting::get(expCtx->opCtx),
+ {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}},
+ false,
+ nullptr);
+
+ // If we hit a stale shardVersion exception, invalidate the routing table cache.
+ if (ErrorCodes::isStaleShardingError(swShardResult.getStatus().code())) {
+ catalogCache->onStaleConfigError(std::move(routingInfo));
+ }
+ } while (!swShardResult.isOK() && ++numAttempts < kMaxNumStaleVersionRetries);
+
+ auto shardResult = uassertStatusOK(std::move(swShardResult));
+ invariant(shardResult.size() == 1u);
+
+ auto& cursor = shardResult.front().cursorResponse;
+ auto& batch = cursor.getBatch();
+
+ // We should have at most 1 result, and the cursor should be exhausted.
+ uassert(ErrorCodes::InternalError,
+ str::stream() << "Shard cursor was unexpectedly open after lookup: "
+ << shardResult.front().hostAndPort
+ << ", id: "
+ << cursor.getCursorId(),
+ cursor.getCursorId() == 0);
+ uassert(ErrorCodes::TooManyMatchingDocuments,
+ str::stream() << "found more than one document matching " << filter.toString()
+ << " ["
+ << batch.begin()->toString()
+ << ", "
+ << std::next(batch.begin())->toString()
+ << "]",
+ batch.size() <= 1u);
+
+ return boost::make_optional(!batch.empty(), Document(batch.front()));
+ }
+
+private:
+ intrusive_ptr<ExpressionContext> _expCtx;
+ OperationContext* _opCtx;
+};
+} // namespace
+
+void PipelineS::injectMongosInterface(Pipeline* pipeline) {
+ const auto& sources = pipeline->getSources();
+ for (auto&& source : sources) {
+ if (auto needsMongos =
+ dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get())) {
+ needsMongos->injectMongoProcessInterface(
+ std::make_shared<MongosProcessInterface>(pipeline->getContext()));
+ }
+ }
+}
+} // namespace mongo
diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h
new file mode 100644
index 00000000000..ca4bdc58270
--- /dev/null
+++ b/src/mongo/s/commands/pipeline_s.h
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/pipeline.h"
+
+namespace mongo {
+/**
+ * PipelineS is an extension of the Pipeline class to provide additional utility functions on
+ * mongoS. For example, it can inject the pipeline with an implementation of MongoProcessInterface
+ * which provides mongos-specific versions of methods required by some document sources.
+ */
+class PipelineS {
+public:
+ /**
+ * Injects a MongosProcessInterface into stages which require access to its functionality.
+ */
+ static void injectMongosInterface(Pipeline* pipeline);
+
+private:
+ PipelineS() = delete; // Should never be instantiated.
+};
+
+} // namespace mongo