summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-08-28 15:10:42 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-09-01 15:36:35 -0400
commitbc3e230523e4677e2f3fed64ea89c369182a9272 (patch)
treebb35904e784f224e6d5ab87b508c69c72f447dd3 /src/mongo/s/query
parent4e01e3582541fc00ec2e83c97cac89b59fbfeb34 (diff)
downloadmongo-bc3e230523e4677e2f3fed64ea89c369182a9272.tar.gz
SERVER-30704 Use ARM to merge agg cursors on mongos.
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/SConscript6
-rw-r--r--src/mongo/s/query/async_results_merger.cpp11
-rw-r--r--src/mongo/s/query/async_results_merger.h8
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp99
-rw-r--r--src/mongo/s/query/router_exec_stage.h27
-rw-r--r--src/mongo/s/query/router_stage_aggregation_merge.cpp79
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp12
-rw-r--r--src/mongo/s/query/router_stage_limit.h6
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp2
-rw-r--r--src/mongo/s/query/router_stage_merge.h3
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp2
-rw-r--r--src/mongo/s/query/router_stage_mock.h5
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp135
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h (renamed from src/mongo/s/query/router_stage_aggregation_merge.h)13
-rw-r--r--src/mongo/s/query/router_stage_remove_metadata_fields.cpp89
-rw-r--r--src/mongo/s/query/router_stage_remove_metadata_fields.h54
-rw-r--r--src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp (renamed from src/mongo/s/query/router_stage_remove_sortkey_test.cpp)93
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp75
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h53
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp12
-rw-r--r--src/mongo/s/query/router_stage_skip.h6
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp9
22 files changed, 485 insertions, 314 deletions
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 460dcf7912e..2ae87217bc8 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -32,11 +32,11 @@ env.Library(
env.Library(
target="router_exec_stage",
source=[
- "router_stage_aggregation_merge.cpp",
"router_stage_limit.cpp",
"router_stage_merge.cpp",
"router_stage_mock.cpp",
- "router_stage_remove_sortkey.cpp",
+ "router_stage_pipeline.cpp",
+ "router_stage_remove_metadata_fields.cpp",
"router_stage_skip.cpp",
],
LIBDEPS=[
@@ -48,7 +48,7 @@ env.CppUnitTest(
target="router_exec_stage_test",
source=[
"router_stage_limit_test.cpp",
- "router_stage_remove_sortkey_test.cpp",
+ "router_stage_remove_metadata_fields_test.cpp",
"router_stage_skip_test.cpp",
],
LIBDEPS=[
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 19281785be0..50886944bb2 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -61,7 +61,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
_mergeQueue(MergingComparator(_remotes, _params->sort)) {
size_t remoteIndex = 0;
for (const auto& remote : _params->remotes) {
- _remotes.emplace_back(remote.hostAndPort, remote.cursorResponse.getCursorId());
+ _remotes.emplace_back(remote.hostAndPort,
+ remote.cursorResponse.getNSS(),
+ remote.cursorResponse.getCursorId());
// We don't check the return value of addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
@@ -269,7 +271,7 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
adjustedBatchSize = *_params->batchSize - remote.fetchedCount;
}
- BSONObj cmdObj = GetMoreRequest(_params->nsString,
+ BSONObj cmdObj = GetMoreRequest(remote.cursorNss,
remote.cursorId,
adjustedBatchSize,
_awaitDataTimeout,
@@ -582,8 +584,11 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o
//
AsyncResultsMerger::RemoteCursorData::RemoteCursorData(HostAndPort hostAndPort,
+ NamespaceString cursorNss,
CursorId establishedCursorId)
- : cursorId(establishedCursorId), shardHostAndPort(std::move(hostAndPort)) {}
+ : cursorId(establishedCursorId),
+ cursorNss(std::move(cursorNss)),
+ shardHostAndPort(std::move(hostAndPort)) {}
const HostAndPort& AsyncResultsMerger::RemoteCursorData::getTargetHost() const {
return shardHostAndPort;
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 04262309a99..c6ec2a26052 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -202,7 +202,9 @@ private:
* reported from the remote.
*/
struct RemoteCursorData {
- RemoteCursorData(HostAndPort hostAndPort, CursorId establishedCursorId);
+ RemoteCursorData(HostAndPort hostAndPort,
+ NamespaceString cursorNss,
+ CursorId establishedCursorId);
/**
* Returns the resolved host and port on which the remote cursor resides.
@@ -230,6 +232,10 @@ private:
// member will be set to zero.
CursorId cursorId;
+ // The namespace this cursor belongs to - note this may be different than the namespace of
+ // the operation if there is a view.
+ NamespaceString cursorNss;
+
// The exact host in the shard on which the cursor resides.
HostAndPort shardHostAndPort;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index e7716355c0f..f286cee408e 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -26,17 +26,18 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
-
#include "mongo/platform/basic.h"
#include "mongo/s/query/cluster_client_cursor_impl.h"
-#include "mongo/s/query/router_stage_aggregation_merge.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_skip.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/s/query/router_stage_limit.h"
#include "mongo/s/query/router_stage_merge.h"
#include "mongo/s/query/router_stage_mock.h"
-#include "mongo/s/query/router_stage_remove_sortkey.h"
+#include "mongo/s/query/router_stage_pipeline.h"
+#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/s/query/router_stage_skip.h"
#include "mongo/stdx/memory.h"
@@ -140,18 +141,87 @@ boost::optional<LogicalSessionId> ClusterClientCursorImpl::getLsid() const {
return _lsid;
}
+namespace {
+
+/**
+ * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the
+ * sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
+ */
+boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) {
+ if (auto frontSort = mergePipeline->popFrontStageWithName(DocumentSourceSort::kStageName)) {
+ auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
+ if (auto sortLimit = sortStage->getLimitSrc()) {
+ // There was a limit stage absorbed into the sort stage, so we need to preserve that.
+ mergePipeline->addInitialSource(sortLimit);
+ }
+ return sortStage
+ ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
+ .toBson();
+ }
+ return boost::none;
+}
+
+bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
+ return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
+ dynamic_cast<DocumentSourceSkip*>(stage.get()));
+}
+
+bool isAllLimitsAndSkips(Pipeline* pipeline) {
+ const auto stages = pipeline->getSources();
+ return std::all_of(
+ stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); });
+}
+
+std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
+ invariant(params->mergePipeline);
+ invariant(!params->skip);
+ invariant(!params->limit);
+ auto* pipeline = params->mergePipeline.get();
+ auto* opCtx = pipeline->getContext()->opCtx;
+
+ std::unique_ptr<RouterExecStage> root =
+ stdx::make_unique<RouterStageMerge>(opCtx, executor, params);
+ if (!isAllLimitsAndSkips(pipeline)) {
+ return stdx::make_unique<RouterStagePipeline>(std::move(root),
+ std::move(params->mergePipeline));
+ }
+
+ // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and
+ // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive
+ // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree
+ // instead.
+ while (!pipeline->getSources().empty()) {
+ invariant(isSkipOrLimit(pipeline->getSources().front()));
+ if (auto skip = pipeline->popFrontStageWithName(DocumentSourceSkip::kStageName)) {
+ root = stdx::make_unique<RouterStageSkip>(
+ opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
+ } else if (auto limit = pipeline->popFrontStageWithName(DocumentSourceLimit::kStageName)) {
+ root = stdx::make_unique<RouterStageLimit>(
+ opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
+ }
+ }
+ if (!params->sort.isEmpty()) {
+ // We are executing the pipeline without using a Pipeline, so we need to strip out any
+ // Document metadata ourselves. Note we only need this stage if there was a sort, since
+ // otherwise there would be no way for this half of the pipeline to require any metadata
+ // fields.
+ root = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(root), Document::allMetadataFieldNames);
+ }
+ return root;
+}
+} // namespace
+
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) {
const auto skip = params->skip;
const auto limit = params->limit;
- const bool hasSort = !params->sort.isEmpty();
-
- // The first stage always merges from the remotes. If 'mergePipeline' has been specified in
- // ClusterClientCursorParams, then RouterStageAggregationMerge should be the root and only node.
- // Otherwise, construct a RouterStage pipeline from the remotes, skip, limit, and sort fields in
- // 'params'.
if (params->mergePipeline) {
- return stdx::make_unique<RouterStageAggregationMerge>(std::move(params->mergePipeline));
+ if (auto sort = extractLeadingSort(params->mergePipeline.get())) {
+ params->sort = *sort;
+ }
+ return buildPipelinePlan(executor, params);
}
std::unique_ptr<RouterExecStage> root =
@@ -165,8 +235,13 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
root = stdx::make_unique<RouterStageLimit>(opCtx, std::move(root), *limit);
}
+ const bool hasSort = !params->sort.isEmpty();
if (hasSort) {
- root = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(root));
+ // Strip out the sort key after sorting.
+ root = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx,
+ std::move(root),
+ std::vector<StringData>{ClusterClientCursorParams::kSortKeyField});
}
return root;
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index ac074d92b62..e7d8ebb65b9 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -77,12 +77,18 @@ public:
* currently attached. This is so that a killing thread may call this method with its own
* operation context.
*/
- virtual void kill(OperationContext* opCtx) = 0;
+ virtual void kill(OperationContext* opCtx) {
+ invariant(_child); // The default implementation forwards to the child stage.
+ _child->kill(opCtx);
+ }
/**
* Returns whether or not all the remote cursors are exhausted.
*/
- virtual bool remotesExhausted() = 0;
+ virtual bool remotesExhausted() {
+ invariant(_child); // The default implementation forwards to the child stage.
+ return _child->remotesExhausted();
+ }
/**
* Sets the maxTimeMS value that the cursor should forward with any internally issued getMore
@@ -91,7 +97,15 @@ public:
* Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if
* the cursor is not tailable + awaitData).
*/
- virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ if (_child) {
+ auto childStatus = _child->setAwaitDataTimeout(awaitDataTimeout);
+ if (!childStatus.isOK()) {
+ return childStatus;
+ }
+ }
+ return doSetAwaitDataTimeout(awaitDataTimeout);
+ }
/**
* Sets the current operation context to be used by the router stage.
@@ -135,6 +149,13 @@ protected:
virtual void doDetachFromOperationContext() {}
/**
+ * Performs any stage-specific await data timeout actions.
+ */
+ virtual Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return Status::OK();
+ }
+
+ /**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
*/
RouterExecStage* getChildStage() {
diff --git a/src/mongo/s/query/router_stage_aggregation_merge.cpp b/src/mongo/s/query/router_stage_aggregation_merge.cpp
deleted file mode 100644
index a4273dbd4a7..00000000000
--- a/src/mongo/s/query/router_stage_aggregation_merge.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/query/router_stage_aggregation_merge.h"
-
-#include "mongo/db/pipeline/document_source_merge_cursors.h"
-#include "mongo/db/pipeline/expression_context.h"
-
-namespace mongo {
-
-RouterStageAggregationMerge::RouterStageAggregationMerge(
- std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline)
- : RouterExecStage(mergePipeline->getContext()->opCtx),
- _mergePipeline(std::move(mergePipeline)) {}
-
-StatusWith<ClusterQueryResult> RouterStageAggregationMerge::next() {
- // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
- if (auto result = _mergePipeline->getNext()) {
- return {result->toBson()};
- }
-
- // If we reach this point, we have hit EOF.
- _mergePipeline.get_deleter().dismissDisposal();
- _mergePipeline->dispose(getOpCtx());
-
- return {ClusterQueryResult()};
-}
-
-void RouterStageAggregationMerge::doReattachToOperationContext() {
- _mergePipeline->reattachToOperationContext(getOpCtx());
-}
-
-void RouterStageAggregationMerge::doDetachFromOperationContext() {
- _mergePipeline->detachFromOperationContext();
-}
-
-void RouterStageAggregationMerge::kill(OperationContext* opCtx) {
- _mergePipeline.get_deleter().dismissDisposal();
- _mergePipeline->dispose(opCtx);
-}
-
-bool RouterStageAggregationMerge::remotesExhausted() {
- const auto mergeSource =
- static_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get());
- return mergeSource->remotesExhausted();
-}
-
-Status RouterStageAggregationMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"};
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index feb8f11626f..b3fd2b14651 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -57,16 +57,4 @@ StatusWith<ClusterQueryResult> RouterStageLimit::next() {
return childResult;
}
-void RouterStageLimit::kill(OperationContext* opCtx) {
- getChildStage()->kill(opCtx);
-}
-
-bool RouterStageLimit::remotesExhausted() {
- return getChildStage()->remotesExhausted();
-}
-
-Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 42ef46c21ab..1a158e2c3a7 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -43,12 +43,6 @@ public:
StatusWith<ClusterQueryResult> next() final;
- void kill(OperationContext* opCtx) final;
-
- bool remotesExhausted() final;
-
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-
private:
long long _limit;
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 78ee1a3475a..f2a159003c2 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -69,7 +69,7 @@ bool RouterStageMerge::remotesExhausted() {
return _arm.remotesExhausted();
}
-Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm.setAwaitDataTimeout(awaitDataTimeout);
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 23503c664f6..78c5383e0ee 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -53,7 +53,8 @@ public:
bool remotesExhausted() final;
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+protected:
+ Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
protected:
void doReattachToOperationContext() override {
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index edeb1f9945c..7ebc3a6a554 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -68,7 +68,7 @@ bool RouterStageMock::remotesExhausted() {
return _remotesExhausted;
}
-Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+Status RouterStageMock::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
_awaitDataTimeout = awaitDataTimeout;
return Status::OK();
}
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index e2f8e7adab5..8e3075103d5 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -51,8 +51,6 @@ public:
bool remotesExhausted() final;
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-
/**
* Queues a BSONObj to be returned.
*/
@@ -79,6 +77,9 @@ public:
*/
StatusWith<Milliseconds> getAwaitDataTimeout();
+protected:
+ Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
std::queue<StatusWith<ClusterQueryResult>> _resultsQueue;
bool _remotesExhausted = false;
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
new file mode 100644
index 00000000000..d9cf02f85c3
--- /dev/null
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -0,0 +1,135 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/query/router_stage_pipeline.h"
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
+#include "mongo/db/pipeline/expression_context.h"
+
+namespace mongo {
+
+namespace {
+
+/**
+ * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces,
+ * translating results from an input RouterExecStage into DocumentSource::GetNextResults.
+ */
+class DocumentSourceRouterAdapter : public DocumentSource {
+public:
+ static boost::intrusive_ptr<DocumentSourceRouterAdapter> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<RouterExecStage> childStage) {
+ return new DocumentSourceRouterAdapter(expCtx, std::move(childStage));
+ }
+
+ GetNextResult getNext() final {
+ auto next = uassertStatusOK(_child->next());
+ if (auto nextObj = next.getResult()) {
+ return Document::fromBsonWithMetaData(*nextObj);
+ }
+ return GetNextResult::makeEOF();
+ }
+
+ void doDispose() final {
+ _child->kill(pExpCtx->opCtx);
+ }
+
+ void reattachToOperationContext(OperationContext* opCtx) final {
+ _child->reattachToOperationContext(opCtx);
+ }
+
+ void detachFromOperationContext() final {
+ _child->detachFromOperationContext();
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
+ invariant(explain); // We shouldn't need to serialize this stage to send it anywhere.
+ return Value(); // Return the empty value to hide this stage from explain output.
+ }
+
+ bool remotesExhausted() {
+ return _child->remotesExhausted();
+ }
+
+private:
+ DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<RouterExecStage> childStage)
+ : DocumentSource(expCtx), _child(std::move(childStage)) {}
+
+ std::unique_ptr<RouterExecStage> _child;
+};
+} // namespace
+
+RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
+ std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline)
+ : RouterExecStage(mergePipeline->getContext()->opCtx),
+ _mergePipeline(std::move(mergePipeline)) {
+ // Add an adapter to the front of the pipeline to draw results from 'child'.
+ _mergePipeline->addInitialSource(
+ DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)));
+}
+
+StatusWith<ClusterQueryResult> RouterStagePipeline::next() {
+ // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
+ if (auto result = _mergePipeline->getNext()) {
+ return {result->toBson()};
+ }
+
+ // If we reach this point, we have hit EOF.
+ _mergePipeline.get_deleter().dismissDisposal();
+ _mergePipeline->dispose(getOpCtx());
+
+ return {ClusterQueryResult()};
+}
+
+void RouterStagePipeline::doReattachToOperationContext() {
+ _mergePipeline->reattachToOperationContext(getOpCtx());
+}
+
+void RouterStagePipeline::doDetachFromOperationContext() {
+ _mergePipeline->detachFromOperationContext();
+}
+
+void RouterStagePipeline::kill(OperationContext* opCtx) {
+ _mergePipeline.get_deleter().dismissDisposal();
+ _mergePipeline->dispose(opCtx);
+}
+
+bool RouterStagePipeline::remotesExhausted() {
+ return static_cast<DocumentSourceRouterAdapter*>(_mergePipeline->getSources().front().get())
+ ->remotesExhausted();
+}
+
+Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"};
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_aggregation_merge.h b/src/mongo/s/query/router_stage_pipeline.h
index 363b46e73d9..780f1fe0e47 100644
--- a/src/mongo/s/query/router_stage_aggregation_merge.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -35,12 +35,13 @@
namespace mongo {
/**
- * Draws results from a Pipeline with a DocumentSourceMergeCursors at its head, which is the
- * underlying source of the stream of merged documents manipulated by the RouterStage pipeline.
+ * Inserts a pipeline into the router execution tree, drawing results from the input stage, feeding
+ * them through the pipeline, and outputting the results of the pipeline.
*/
-class RouterStageAggregationMerge final : public RouterExecStage {
+class RouterStagePipeline final : public RouterExecStage {
public:
- RouterStageAggregationMerge(std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline);
+ RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
+ std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline);
StatusWith<ClusterQueryResult> next() final;
@@ -48,9 +49,9 @@ public:
bool remotesExhausted() final;
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-
protected:
+ Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
void doReattachToOperationContext() final;
void doDetachFromOperationContext() final;
diff --git a/src/mongo/s/query/router_stage_remove_metadata_fields.cpp b/src/mongo/s/query/router_stage_remove_metadata_fields.cpp
new file mode 100644
index 00000000000..3be98380e4e
--- /dev/null
+++ b/src/mongo/s/query/router_stage_remove_metadata_fields.cpp
@@ -0,0 +1,89 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <algorithm>
+
+#include "mongo/s/query/router_stage_remove_metadata_fields.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/document.h"
+
+namespace mongo {
+
+RouterStageRemoveMetadataFields::RouterStageRemoveMetadataFields(
+ OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> child,
+ std::vector<StringData> metadataFields)
+ : RouterExecStage(opCtx, std::move(child)), _metaFields(std::move(metadataFields)) {
+ for (auto&& fieldName : _metaFields) {
+ invariant(fieldName[0] == '$'); // We use this information to optimize next().
+ }
+}
+
+StatusWith<ClusterQueryResult> RouterStageRemoveMetadataFields::next() {
+ auto childResult = getChildStage()->next();
+ if (!childResult.isOK() || !childResult.getValue().getResult()) {
+ return childResult;
+ }
+
+ BSONObjIterator iterator(*childResult.getValue().getResult());
+ // Find the first field that we need to remove.
+ while (iterator.more() && (*iterator).fieldName()[0] != '$' &&
+ std::find(_metaFields.begin(), _metaFields.end(), (*iterator).fieldNameStringData()) ==
+ _metaFields.end()) {
+ ++iterator;
+ }
+
+ if (!iterator.more()) {
+ // We got all the way to the end without finding any fields to remove, just return the whole
+ // document.
+ return childResult;
+ }
+
+ // Copy everything up to the first metadata field.
+ const auto firstElementBufferStart =
+ childResult.getValue().getResult()->firstElement().rawdata();
+ auto endOfNonMetaFieldBuffer = (*iterator).rawdata();
+ BSONObjBuilder builder;
+ builder.bb().appendBuf(firstElementBufferStart,
+ endOfNonMetaFieldBuffer - firstElementBufferStart);
+
+ // Copy any remaining fields that are not metadata. We expect metadata fields are likely to be
+ // at the end of the document, so there is likely nothing else to copy.
+ while ((++iterator).more()) {
+ if (std::find(_metaFields.begin(), _metaFields.end(), (*iterator).fieldNameStringData()) ==
+ _metaFields.end()) {
+ builder.append(*iterator);
+ }
+ }
+ return {builder.obj()};
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_metadata_fields.h b/src/mongo/s/query/router_stage_remove_metadata_fields.h
new file mode 100644
index 00000000000..07c9c7c36bb
--- /dev/null
+++ b/src/mongo/s/query/router_stage_remove_metadata_fields.h
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/s/query/router_exec_stage.h"
+
+namespace mongo {
+
+/**
+ * Removes metadata fields from a BSON object.
+ */
+class RouterStageRemoveMetadataFields final : public RouterExecStage {
+public:
+ RouterStageRemoveMetadataFields(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> child,
+ std::vector<StringData> fieldsToRemove);
+
+ StatusWith<ClusterQueryResult> next() final;
+
+private:
+ // Use a StringMap so we can look up by StringData - avoiding a string allocation on each field
+ // in each object. The value here is meaningless.
+ std::vector<StringData> _metaFields;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp
index 5767549ad64..bb8ea4613b8 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
+++ b/src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp
@@ -1,37 +1,38 @@
/**
- * Copyright 2015 MongoDB Inc.
+ * Copyright (C) 2017 MongoDB Inc.
*
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
*
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
*
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
*
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
-#include "mongo/s/query/router_stage_remove_sortkey.h"
+#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/document.h"
#include "mongo/s/query/router_stage_mock.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
@@ -44,15 +45,20 @@ namespace {
// going through the trouble of making one, we'll just use nullptr throughout.
OperationContext* opCtx = nullptr;
-TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
+TEST(RouterStageRemoveMetadataFieldsTest, RemovesMetaDataFields) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 4 << "$sortKey" << 1 << "b" << 3));
mockStage->queueResult(BSON("$sortKey" << BSON("" << 3) << "c" << BSON("d"
<< "foo")));
mockStage->queueResult(BSON("a" << 3));
+ mockStage->queueResult(BSON("a" << 3 << "$randVal" << 4 << "$sortKey" << 2));
+ mockStage->queueResult(
+ BSON("$textScore" << 2 << "a" << 3 << "$randVal" << 4 << "$sortKey" << 2));
+ mockStage->queueResult(BSON("$textScore" << 2));
mockStage->queueResult(BSONObj());
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), Document::allMetadataFieldNames);
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -74,19 +80,35 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
auto fourthResult = sortKeyStage->next();
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().getResult());
- ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSONObj());
+ ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSON("a" << 3));
auto fifthResult = sortKeyStage->next();
ASSERT_OK(fifthResult.getStatus());
- ASSERT(fifthResult.getValue().isEOF());
+ ASSERT(fifthResult.getValue().getResult());
+ ASSERT_BSONOBJ_EQ(*fifthResult.getValue().getResult(), BSON("a" << 3));
+
+ auto sixthResult = sortKeyStage->next();
+ ASSERT_OK(sixthResult.getStatus());
+ ASSERT(sixthResult.getValue().getResult());
+ ASSERT_BSONOBJ_EQ(*sixthResult.getValue().getResult(), BSONObj());
+
+ auto seventhResult = sortKeyStage->next();
+ ASSERT_OK(seventhResult.getStatus());
+ ASSERT(seventhResult.getValue().getResult());
+ ASSERT_BSONOBJ_EQ(*seventhResult.getValue().getResult(), BSONObj());
+
+ auto eighthResult = sortKeyStage->next();
+ ASSERT_OK(eighthResult.getStatus());
+ ASSERT(eighthResult.getValue().isEOF());
}
-TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
+TEST(RouterStageRemoveMetadataFieldsTest, PropagatesError) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("$sortKey" << 1));
mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened"));
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd});
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -99,13 +121,14 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
}
-TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
+TEST(RouterStageRemoveMetadataFieldsTest, ToleratesMidStreamEOF) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1));
mockStage->queueEOF();
mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2));
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd});
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -126,13 +149,14 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
ASSERT(fourthResult.getValue().isEOF());
}
-TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
+TEST(RouterStageRemoveMetadataFieldsTest, RemotesExhausted) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1));
mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2));
mockStage->markRemotesExhausted();
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd});
ASSERT_TRUE(sortKeyStage->remotesExhausted());
auto firstResult = sortKeyStage->next();
@@ -153,12 +177,13 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
ASSERT_TRUE(sortKeyStage->remotesExhausted());
}
-TEST(RouterStageRemoveSortKeyTest, ForwardsAwaitDataTimeout) {
+TEST(RouterStageRemoveMetadataFieldsTest, ForwardsAwaitDataTimeout) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
auto mockStagePtr = mockStage.get();
ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd});
ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789)));
auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
deleted file mode 100644
index fe7a8cf0f7d..00000000000
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/query/router_stage_remove_sortkey.h"
-
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/s/query/cluster_client_cursor_params.h"
-#include "mongo/util/mongoutils/str.h"
-
-namespace mongo {
-
-RouterStageRemoveSortKey::RouterStageRemoveSortKey(OperationContext* opCtx,
- std::unique_ptr<RouterExecStage> child)
- : RouterExecStage(opCtx, std::move(child)) {}
-
-StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() {
- auto childResult = getChildStage()->next();
- if (!childResult.isOK() || !childResult.getValue().getResult()) {
- return childResult;
- }
-
- const auto& childObj = childResult.getValue().getResult();
-
- BSONObjBuilder builder;
- for (BSONElement elt : *childObj) {
- if (!str::equals(elt.fieldName(), ClusterClientCursorParams::kSortKeyField)) {
- builder.append(elt);
- }
- }
-
- return {builder.obj()};
-}
-
-void RouterStageRemoveSortKey::kill(OperationContext* opCtx) {
- getChildStage()->kill(opCtx);
-}
-
-bool RouterStageRemoveSortKey::remotesExhausted() {
- return getChildStage()->remotesExhausted();
-}
-
-Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
deleted file mode 100644
index ba71364dfa9..00000000000
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/s/query/router_exec_stage.h"
-
-namespace mongo {
-
-/**
- * Removes the sort key added to each document by mongod's sortKey meta-projection.
- *
- * Only needed if the query specifies a sort.
- */
-class RouterStageRemoveSortKey final : public RouterExecStage {
-public:
- RouterStageRemoveSortKey(OperationContext* opCtx, std::unique_ptr<RouterExecStage> child);
-
- StatusWith<ClusterQueryResult> next() final;
-
- void kill(OperationContext* opCtx) final;
-
- bool remotesExhausted() final;
-
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-};
-
-} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index 50d2107b14c..b514731c9cd 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -58,16 +58,4 @@ StatusWith<ClusterQueryResult> RouterStageSkip::next() {
return getChildStage()->next();
}
-void RouterStageSkip::kill(OperationContext* opCtx) {
- getChildStage()->kill(opCtx);
-}
-
-bool RouterStageSkip::remotesExhausted() {
- return getChildStage()->remotesExhausted();
-}
-
-Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index 49051128577..9e67d25b74d 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -43,12 +43,6 @@ public:
StatusWith<ClusterQueryResult> next() final;
- void kill(OperationContext* opCtx) final;
-
- bool remotesExhausted() final;
-
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-
private:
long long _skip;
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index dce282c5892..506ac226636 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -63,10 +63,11 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
ClusterClientCursorParams params(
incomingCursorResponse.getValue().getNSS(),
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames());
- params.remotes.emplace_back(
- shardId,
- server,
- CursorResponse(requestedNss, incomingCursorResponse.getValue().getCursorId(), {}));
+ params.remotes.emplace_back(shardId,
+ server,
+ CursorResponse(incomingCursorResponse.getValue().getNSS(),
+ incomingCursorResponse.getValue().getCursorId(),
+ {}));
auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params));