diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-08-28 15:10:42 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-01 15:36:35 -0400 |
commit | bc3e230523e4677e2f3fed64ea89c369182a9272 (patch) | |
tree | bb35904e784f224e6d5ab87b508c69c72f447dd3 /src/mongo/s/query | |
parent | 4e01e3582541fc00ec2e83c97cac89b59fbfeb34 (diff) | |
download | mongo-bc3e230523e4677e2f3fed64ea89c369182a9272.tar.gz |
SERVER-30704 Use ARM to merge agg cursors on mongos.
Diffstat (limited to 'src/mongo/s/query')
22 files changed, 485 insertions, 314 deletions
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 460dcf7912e..2ae87217bc8 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -32,11 +32,11 @@ env.Library( env.Library( target="router_exec_stage", source=[ - "router_stage_aggregation_merge.cpp", "router_stage_limit.cpp", "router_stage_merge.cpp", "router_stage_mock.cpp", - "router_stage_remove_sortkey.cpp", + "router_stage_pipeline.cpp", + "router_stage_remove_metadata_fields.cpp", "router_stage_skip.cpp", ], LIBDEPS=[ @@ -48,7 +48,7 @@ env.CppUnitTest( target="router_exec_stage_test", source=[ "router_stage_limit_test.cpp", - "router_stage_remove_sortkey_test.cpp", + "router_stage_remove_metadata_fields_test.cpp", "router_stage_skip_test.cpp", ], LIBDEPS=[ diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 19281785be0..50886944bb2 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -61,7 +61,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, _mergeQueue(MergingComparator(_remotes, _params->sort)) { size_t remoteIndex = 0; for (const auto& remote : _params->remotes) { - _remotes.emplace_back(remote.hostAndPort, remote.cursorResponse.getCursorId()); + _remotes.emplace_back(remote.hostAndPort, + remote.cursorResponse.getNSS(), + remote.cursorResponse.getCursorId()); // We don't check the return value of addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. @@ -269,7 +271,7 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { adjustedBatchSize = *_params->batchSize - remote.fetchedCount; } - BSONObj cmdObj = GetMoreRequest(_params->nsString, + BSONObj cmdObj = GetMoreRequest(remote.cursorNss, remote.cursorId, adjustedBatchSize, _awaitDataTimeout, @@ -582,8 +584,11 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o // AsyncResultsMerger::RemoteCursorData::RemoteCursorData(HostAndPort hostAndPort, + NamespaceString cursorNss, CursorId establishedCursorId) - : cursorId(establishedCursorId), shardHostAndPort(std::move(hostAndPort)) {} + : cursorId(establishedCursorId), + cursorNss(std::move(cursorNss)), + shardHostAndPort(std::move(hostAndPort)) {} const HostAndPort& AsyncResultsMerger::RemoteCursorData::getTargetHost() const { return shardHostAndPort; diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 04262309a99..c6ec2a26052 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -202,7 +202,9 @@ private: * reported from the remote. */ struct RemoteCursorData { - RemoteCursorData(HostAndPort hostAndPort, CursorId establishedCursorId); + RemoteCursorData(HostAndPort hostAndPort, + NamespaceString cursorNss, + CursorId establishedCursorId); /** * Returns the resolved host and port on which the remote cursor resides. @@ -230,6 +232,10 @@ private: // member will be set to zero. CursorId cursorId; + // The namespace this cursor belongs to - note this may be different than the namespace of + // the operation if there is a view. + NamespaceString cursorNss; + // The exact host in the shard on which the cursor resides. HostAndPort shardHostAndPort; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index e7716355c0f..f286cee408e 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -26,17 +26,18 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery - #include "mongo/platform/basic.h" #include "mongo/s/query/cluster_client_cursor_impl.h" -#include "mongo/s/query/router_stage_aggregation_merge.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_skip.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_merge.h" #include "mongo/s/query/router_stage_mock.h" -#include "mongo/s/query/router_stage_remove_sortkey.h" +#include "mongo/s/query/router_stage_pipeline.h" +#include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/s/query/router_stage_skip.h" #include "mongo/stdx/memory.h" @@ -140,18 +141,87 @@ boost::optional<LogicalSessionId> ClusterClientCursorImpl::getLsid() const { return _lsid; } +namespace { + +/** + * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the + * sort key pattern of such a $sort stage if there was one, and boost::none otherwise. + */ +boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) { + if (auto frontSort = mergePipeline->popFrontStageWithName(DocumentSourceSort::kStageName)) { + auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get()); + if (auto sortLimit = sortStage->getLimitSrc()) { + // There was a limit stage absorbed into the sort stage, so we need to preserve that. + mergePipeline->addInitialSource(sortLimit); + } + return sortStage + ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) + .toBson(); + } + return boost::none; +} + +bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) { + return (dynamic_cast<DocumentSourceLimit*>(stage.get()) || + dynamic_cast<DocumentSourceSkip*>(stage.get())); +} + +bool isAllLimitsAndSkips(Pipeline* pipeline) { + const auto stages = pipeline->getSources(); + return std::all_of( + stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); }); +} + +std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor, + ClusterClientCursorParams* params) { + invariant(params->mergePipeline); + invariant(!params->skip); + invariant(!params->limit); + auto* pipeline = params->mergePipeline.get(); + auto* opCtx = pipeline->getContext()->opCtx; + + std::unique_ptr<RouterExecStage> root = + stdx::make_unique<RouterStageMerge>(opCtx, executor, params); + if (!isAllLimitsAndSkips(pipeline)) { + return stdx::make_unique<RouterStagePipeline>(std::move(root), + std::move(params->mergePipeline)); + } + + // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and + // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive + // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree + // instead. + while (!pipeline->getSources().empty()) { + invariant(isSkipOrLimit(pipeline->getSources().front())); + if (auto skip = pipeline->popFrontStageWithName(DocumentSourceSkip::kStageName)) { + root = stdx::make_unique<RouterStageSkip>( + opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip()); + } else if (auto limit = pipeline->popFrontStageWithName(DocumentSourceLimit::kStageName)) { + root = stdx::make_unique<RouterStageLimit>( + opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit()); + } + } + if (!params->sort.isEmpty()) { + // We are executing the pipeline without using a Pipeline, so we need to strip out any + // Document metadata ourselves. Note we only need this stage if there was a sort, since + // otherwise there would be no way for this half of the pipeline to require any metadata + // fields. + root = stdx::make_unique<RouterStageRemoveMetadataFields>( + opCtx, std::move(root), Document::allMetadataFieldNames); + } + return root; +} +} // namespace + std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) { const auto skip = params->skip; const auto limit = params->limit; - const bool hasSort = !params->sort.isEmpty(); - - // The first stage always merges from the remotes. If 'mergePipeline' has been specified in - // ClusterClientCursorParams, then RouterStageAggregationMerge should be the root and only node. - // Otherwise, construct a RouterStage pipeline from the remotes, skip, limit, and sort fields in - // 'params'. if (params->mergePipeline) { - return stdx::make_unique<RouterStageAggregationMerge>(std::move(params->mergePipeline)); + if (auto sort = extractLeadingSort(params->mergePipeline.get())) { + params->sort = *sort; + } + return buildPipelinePlan(executor, params); } std::unique_ptr<RouterExecStage> root = @@ -165,8 +235,13 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( root = stdx::make_unique<RouterStageLimit>(opCtx, std::move(root), *limit); } + const bool hasSort = !params->sort.isEmpty(); if (hasSort) { - root = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(root)); + // Strip out the sort key after sorting. + root = stdx::make_unique<RouterStageRemoveMetadataFields>( + opCtx, + std::move(root), + std::vector<StringData>{ClusterClientCursorParams::kSortKeyField}); } return root; diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index ac074d92b62..e7d8ebb65b9 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -77,12 +77,18 @@ public: * currently attached. This is so that a killing thread may call this method with its own * operation context. */ - virtual void kill(OperationContext* opCtx) = 0; + virtual void kill(OperationContext* opCtx) { + invariant(_child); // The default implementation forwards to the child stage. + _child->kill(opCtx); + } /** * Returns whether or not all the remote cursors are exhausted. */ - virtual bool remotesExhausted() = 0; + virtual bool remotesExhausted() { + invariant(_child); // The default implementation forwards to the child stage. + return _child->remotesExhausted(); + } /** * Sets the maxTimeMS value that the cursor should forward with any internally issued getMore @@ -91,7 +97,15 @@ public: * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if * the cursor is not tailable + awaitData). */ - virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + if (_child) { + auto childStatus = _child->setAwaitDataTimeout(awaitDataTimeout); + if (!childStatus.isOK()) { + return childStatus; + } + } + return doSetAwaitDataTimeout(awaitDataTimeout); + } /** * Sets the current operation context to be used by the router stage. @@ -135,6 +149,13 @@ protected: virtual void doDetachFromOperationContext() {} /** + * Performs any stage-specific await data timeout actions. + */ + virtual Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return Status::OK(); + } + + /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. */ RouterExecStage* getChildStage() { diff --git a/src/mongo/s/query/router_stage_aggregation_merge.cpp b/src/mongo/s/query/router_stage_aggregation_merge.cpp deleted file mode 100644 index a4273dbd4a7..00000000000 --- a/src/mongo/s/query/router_stage_aggregation_merge.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/router_stage_aggregation_merge.h" - -#include "mongo/db/pipeline/document_source_merge_cursors.h" -#include "mongo/db/pipeline/expression_context.h" - -namespace mongo { - -RouterStageAggregationMerge::RouterStageAggregationMerge( - std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline) - : RouterExecStage(mergePipeline->getContext()->opCtx), - _mergePipeline(std::move(mergePipeline)) {} - -StatusWith<ClusterQueryResult> RouterStageAggregationMerge::next() { - // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF. - if (auto result = _mergePipeline->getNext()) { - return {result->toBson()}; - } - - // If we reach this point, we have hit EOF. - _mergePipeline.get_deleter().dismissDisposal(); - _mergePipeline->dispose(getOpCtx()); - - return {ClusterQueryResult()}; -} - -void RouterStageAggregationMerge::doReattachToOperationContext() { - _mergePipeline->reattachToOperationContext(getOpCtx()); -} - -void RouterStageAggregationMerge::doDetachFromOperationContext() { - _mergePipeline->detachFromOperationContext(); -} - -void RouterStageAggregationMerge::kill(OperationContext* opCtx) { - _mergePipeline.get_deleter().dismissDisposal(); - _mergePipeline->dispose(opCtx); -} - -bool RouterStageAggregationMerge::remotesExhausted() { - const auto mergeSource = - static_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get()); - return mergeSource->remotesExhausted(); -} - -Status RouterStageAggregationMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"}; -} - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index feb8f11626f..b3fd2b14651 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -57,16 +57,4 @@ StatusWith<ClusterQueryResult> RouterStageLimit::next() { return childResult; } -void RouterStageLimit::kill(OperationContext* opCtx) { - getChildStage()->kill(opCtx); -} - -bool RouterStageLimit::remotesExhausted() { - return getChildStage()->remotesExhausted(); -} - -Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); -} - } // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index 42ef46c21ab..1a158e2c3a7 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -43,12 +43,6 @@ public: StatusWith<ClusterQueryResult> next() final; - void kill(OperationContext* opCtx) final; - - bool remotesExhausted() final; - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - private: long long _limit; diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 78ee1a3475a..f2a159003c2 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -69,7 +69,7 @@ bool RouterStageMerge::remotesExhausted() { return _arm.remotesExhausted(); } -Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { +Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { return _arm.setAwaitDataTimeout(awaitDataTimeout); } diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 23503c664f6..78c5383e0ee 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -53,7 +53,8 @@ public: bool remotesExhausted() final; - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; +protected: + Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; protected: void doReattachToOperationContext() override { diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp index edeb1f9945c..7ebc3a6a554 100644 --- a/src/mongo/s/query/router_stage_mock.cpp +++ b/src/mongo/s/query/router_stage_mock.cpp @@ -68,7 +68,7 @@ bool RouterStageMock::remotesExhausted() { return _remotesExhausted; } -Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { +Status RouterStageMock::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { _awaitDataTimeout = awaitDataTimeout; return Status::OK(); } diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index e2f8e7adab5..8e3075103d5 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -51,8 +51,6 @@ public: bool remotesExhausted() final; - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - /** * Queues a BSONObj to be returned. */ @@ -79,6 +77,9 @@ public: */ StatusWith<Milliseconds> getAwaitDataTimeout(); +protected: + Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + private: std::queue<StatusWith<ClusterQueryResult>> _resultsQueue; bool _remotesExhausted = false; diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp new file mode 100644 index 00000000000..d9cf02f85c3 --- /dev/null +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/router_stage_pipeline.h" + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" +#include "mongo/db/pipeline/expression_context.h" + +namespace mongo { + +namespace { + +/** + * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces, + * translating results from an input RouterExecStage into DocumentSource::GetNextResults. + */ +class DocumentSourceRouterAdapter : public DocumentSource { +public: + static boost::intrusive_ptr<DocumentSourceRouterAdapter> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<RouterExecStage> childStage) { + return new DocumentSourceRouterAdapter(expCtx, std::move(childStage)); + } + + GetNextResult getNext() final { + auto next = uassertStatusOK(_child->next()); + if (auto nextObj = next.getResult()) { + return Document::fromBsonWithMetaData(*nextObj); + } + return GetNextResult::makeEOF(); + } + + void doDispose() final { + _child->kill(pExpCtx->opCtx); + } + + void reattachToOperationContext(OperationContext* opCtx) final { + _child->reattachToOperationContext(opCtx); + } + + void detachFromOperationContext() final { + _child->detachFromOperationContext(); + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { + invariant(explain); // We shouldn't need to serialize this stage to send it anywhere. + return Value(); // Return the empty value to hide this stage from explain output. + } + + bool remotesExhausted() { + return _child->remotesExhausted(); + } + +private: + DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<RouterExecStage> childStage) + : DocumentSource(expCtx), _child(std::move(childStage)) {} + + std::unique_ptr<RouterExecStage> _child; +}; +} // namespace + +RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child, + std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline) + : RouterExecStage(mergePipeline->getContext()->opCtx), + _mergePipeline(std::move(mergePipeline)) { + // Add an adapter to the front of the pipeline to draw results from 'child'. + _mergePipeline->addInitialSource( + DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child))); +} + +StatusWith<ClusterQueryResult> RouterStagePipeline::next() { + // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF. + if (auto result = _mergePipeline->getNext()) { + return {result->toBson()}; + } + + // If we reach this point, we have hit EOF. + _mergePipeline.get_deleter().dismissDisposal(); + _mergePipeline->dispose(getOpCtx()); + + return {ClusterQueryResult()}; +} + +void RouterStagePipeline::doReattachToOperationContext() { + _mergePipeline->reattachToOperationContext(getOpCtx()); +} + +void RouterStagePipeline::doDetachFromOperationContext() { + _mergePipeline->detachFromOperationContext(); +} + +void RouterStagePipeline::kill(OperationContext* opCtx) { + _mergePipeline.get_deleter().dismissDisposal(); + _mergePipeline->dispose(opCtx); +} + +bool RouterStagePipeline::remotesExhausted() { + return static_cast<DocumentSourceRouterAdapter*>(_mergePipeline->getSources().front().get()) + ->remotesExhausted(); +} + +Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"}; +} + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_aggregation_merge.h b/src/mongo/s/query/router_stage_pipeline.h index 363b46e73d9..780f1fe0e47 100644 --- a/src/mongo/s/query/router_stage_aggregation_merge.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -35,12 +35,13 @@ namespace mongo { /** - * Draws results from a Pipeline with a DocumentSourceMergeCursors at its head, which is the - * underlying source of the stream of merged documents manipulated by the RouterStage pipeline. + * Inserts a pipeline into the router execution tree, drawing results from the input stage, feeding + * them through the pipeline, and outputting the results of the pipeline. */ -class RouterStageAggregationMerge final : public RouterExecStage { +class RouterStagePipeline final : public RouterExecStage { public: - RouterStageAggregationMerge(std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline); + RouterStagePipeline(std::unique_ptr<RouterExecStage> child, + std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline); StatusWith<ClusterQueryResult> next() final; @@ -48,9 +49,9 @@ public: bool remotesExhausted() final; - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - protected: + Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void doReattachToOperationContext() final; void doDetachFromOperationContext() final; diff --git a/src/mongo/s/query/router_stage_remove_metadata_fields.cpp b/src/mongo/s/query/router_stage_remove_metadata_fields.cpp new file mode 100644 index 00000000000..3be98380e4e --- /dev/null +++ b/src/mongo/s/query/router_stage_remove_metadata_fields.cpp @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <algorithm> + +#include "mongo/s/query/router_stage_remove_metadata_fields.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/document.h" + +namespace mongo { + +RouterStageRemoveMetadataFields::RouterStageRemoveMetadataFields( + OperationContext* opCtx, + std::unique_ptr<RouterExecStage> child, + std::vector<StringData> metadataFields) + : RouterExecStage(opCtx, std::move(child)), _metaFields(std::move(metadataFields)) { + for (auto&& fieldName : _metaFields) { + invariant(fieldName[0] == '$'); // We use this information to optimize next(). + } +} + +StatusWith<ClusterQueryResult> RouterStageRemoveMetadataFields::next() { + auto childResult = getChildStage()->next(); + if (!childResult.isOK() || !childResult.getValue().getResult()) { + return childResult; + } + + BSONObjIterator iterator(*childResult.getValue().getResult()); + // Find the first field that we need to remove. + while (iterator.more() && (*iterator).fieldName()[0] != '$' && + std::find(_metaFields.begin(), _metaFields.end(), (*iterator).fieldNameStringData()) == + _metaFields.end()) { + ++iterator; + } + + if (!iterator.more()) { + // We got all the way to the end without finding any fields to remove, just return the whole + // document. + return childResult; + } + + // Copy everything up to the first metadata field. + const auto firstElementBufferStart = + childResult.getValue().getResult()->firstElement().rawdata(); + auto endOfNonMetaFieldBuffer = (*iterator).rawdata(); + BSONObjBuilder builder; + builder.bb().appendBuf(firstElementBufferStart, + endOfNonMetaFieldBuffer - firstElementBufferStart); + + // Copy any remaining fields that are not metadata. We expect metadata fields are likely to be + // at the end of the document, so there is likely nothing else to copy. + while ((++iterator).more()) { + if (std::find(_metaFields.begin(), _metaFields.end(), (*iterator).fieldNameStringData()) == + _metaFields.end()) { + builder.append(*iterator); + } + } + return {builder.obj()}; +} + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_metadata_fields.h b/src/mongo/s/query/router_stage_remove_metadata_fields.h new file mode 100644 index 00000000000..07c9c7c36bb --- /dev/null +++ b/src/mongo/s/query/router_stage_remove_metadata_fields.h @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/s/query/router_exec_stage.h" + +namespace mongo { + +/** + * Removes metadata fields from a BSON object. + */ +class RouterStageRemoveMetadataFields final : public RouterExecStage { +public: + RouterStageRemoveMetadataFields(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> child, + std::vector<StringData> fieldsToRemove); + + StatusWith<ClusterQueryResult> next() final; + +private: + // Use a StringMap so we can look up by StringData - avoiding a string allocation on each field + // in each object. The value here is meaningless. + std::vector<StringData> _metaFields; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp index 5767549ad64..bb8ea4613b8 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp +++ b/src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp @@ -1,37 +1,38 @@ /** - * Copyright 2015 MongoDB Inc. + * Copyright (C) 2017 MongoDB Inc. * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. */ #include "mongo/platform/basic.h" -#include "mongo/s/query/router_stage_remove_sortkey.h" +#include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/document.h" #include "mongo/s/query/router_stage_mock.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" @@ -44,15 +45,20 @@ namespace { // going through the trouble of making one, we'll just use nullptr throughout. OperationContext* opCtx = nullptr; -TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { +TEST(RouterStageRemoveMetadataFieldsTest, RemovesMetaDataFields) { auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 4 << "$sortKey" << 1 << "b" << 3)); mockStage->queueResult(BSON("$sortKey" << BSON("" << 3) << "c" << BSON("d" << "foo"))); mockStage->queueResult(BSON("a" << 3)); + mockStage->queueResult(BSON("a" << 3 << "$randVal" << 4 << "$sortKey" << 2)); + mockStage->queueResult( + BSON("$textScore" << 2 << "a" << 3 << "$randVal" << 4 << "$sortKey" << 2)); + mockStage->queueResult(BSON("$textScore" << 2)); mockStage->queueResult(BSONObj()); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>( + opCtx, std::move(mockStage), Document::allMetadataFieldNames); auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -74,19 +80,35 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { auto fourthResult = sortKeyStage->next(); ASSERT_OK(fourthResult.getStatus()); ASSERT(fourthResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSONObj()); + ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSON("a" << 3)); auto fifthResult = sortKeyStage->next(); ASSERT_OK(fifthResult.getStatus()); - ASSERT(fifthResult.getValue().isEOF()); + ASSERT(fifthResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*fifthResult.getValue().getResult(), BSON("a" << 3)); + + auto sixthResult = sortKeyStage->next(); + ASSERT_OK(sixthResult.getStatus()); + ASSERT(sixthResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*sixthResult.getValue().getResult(), BSONObj()); + + auto seventhResult = sortKeyStage->next(); + ASSERT_OK(seventhResult.getStatus()); + ASSERT(seventhResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*seventhResult.getValue().getResult(), BSONObj()); + + auto eighthResult = sortKeyStage->next(); + ASSERT_OK(eighthResult.getStatus()); + ASSERT(eighthResult.getValue().isEOF()); } -TEST(RouterStageRemoveSortKeyTest, PropagatesError) { +TEST(RouterStageRemoveMetadataFieldsTest, PropagatesError) { auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("$sortKey" << 1)); mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>( + opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd}); auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -99,13 +121,14 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) { ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); } -TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) { +TEST(RouterStageRemoveMetadataFieldsTest, ToleratesMidStreamEOF) { auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1)); mockStage->queueEOF(); mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2)); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>( + opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd}); auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -126,13 +149,14 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) { ASSERT(fourthResult.getValue().isEOF()); } -TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { +TEST(RouterStageRemoveMetadataFieldsTest, RemotesExhausted) { auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1)); mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2)); mockStage->markRemotesExhausted(); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>( + opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd}); ASSERT_TRUE(sortKeyStage->remotesExhausted()); auto firstResult = sortKeyStage->next(); @@ -153,12 +177,13 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { ASSERT_TRUE(sortKeyStage->remotesExhausted()); } -TEST(RouterStageRemoveSortKeyTest, ForwardsAwaitDataTimeout) { +TEST(RouterStageRemoveMetadataFieldsTest, ForwardsAwaitDataTimeout) { auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); auto mockStagePtr = mockStage.get(); ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>( + opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd}); ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789))); auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp deleted file mode 100644 index fe7a8cf0f7d..00000000000 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/router_stage_remove_sortkey.h" - -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/s/query/cluster_client_cursor_params.h" -#include "mongo/util/mongoutils/str.h" - -namespace mongo { - -RouterStageRemoveSortKey::RouterStageRemoveSortKey(OperationContext* opCtx, - std::unique_ptr<RouterExecStage> child) - : RouterExecStage(opCtx, std::move(child)) {} - -StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() { - auto childResult = getChildStage()->next(); - if (!childResult.isOK() || !childResult.getValue().getResult()) { - return childResult; - } - - const auto& childObj = childResult.getValue().getResult(); - - BSONObjBuilder builder; - for (BSONElement elt : *childObj) { - if (!str::equals(elt.fieldName(), ClusterClientCursorParams::kSortKeyField)) { - builder.append(elt); - } - } - - return {builder.obj()}; -} - -void RouterStageRemoveSortKey::kill(OperationContext* opCtx) { - getChildStage()->kill(opCtx); -} - -bool RouterStageRemoveSortKey::remotesExhausted() { - return getChildStage()->remotesExhausted(); -} - -Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); -} - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h deleted file mode 100644 index ba71364dfa9..00000000000 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/s/query/router_exec_stage.h" - -namespace mongo { - -/** - * Removes the sort key added to each document by mongod's sortKey meta-projection. - * - * Only needed if the query specifies a sort. - */ -class RouterStageRemoveSortKey final : public RouterExecStage { -public: - RouterStageRemoveSortKey(OperationContext* opCtx, std::unique_ptr<RouterExecStage> child); - - StatusWith<ClusterQueryResult> next() final; - - void kill(OperationContext* opCtx) final; - - bool remotesExhausted() final; - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; -}; - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index 50d2107b14c..b514731c9cd 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -58,16 +58,4 @@ StatusWith<ClusterQueryResult> RouterStageSkip::next() { return getChildStage()->next(); } -void RouterStageSkip::kill(OperationContext* opCtx) { - getChildStage()->kill(opCtx); -} - -bool RouterStageSkip::remotesExhausted() { - return getChildStage()->remotesExhausted(); -} - -Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); -} - } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index 49051128577..9e67d25b74d 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -43,12 +43,6 @@ public: StatusWith<ClusterQueryResult> next() final; - void kill(OperationContext* opCtx) final; - - bool remotesExhausted() final; - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - private: long long _skip; diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index dce282c5892..506ac226636 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -63,10 +63,11 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, ClusterClientCursorParams params( incomingCursorResponse.getValue().getNSS(), AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames()); - params.remotes.emplace_back( - shardId, - server, - CursorResponse(requestedNss, incomingCursorResponse.getValue().getCursorId(), {})); + params.remotes.emplace_back(shardId, + server, + CursorResponse(incomingCursorResponse.getValue().getNSS(), + incomingCursorResponse.getValue().getCursorId(), + {})); auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params)); |