summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2018-04-05 13:56:47 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2018-04-05 13:59:27 -0400
commite88c6d85036607ddf86105234917b4adfffbd612 (patch)
tree43ff091111fec8836654b0ccc3dcaff6e3d2a518 /src
parentac6544a9194197b5ab10f563cf1a19dcdc42349c (diff)
downloadmongo-e88c6d85036607ddf86105234917b4adfffbd612.tar.gz
Revert "SERVER-33323 Use the IDL to serialize the ARM"
This reverts commit 7d09f278a2acf9791b36927d6af1d30347d60391.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/dbclientcursor.cpp2
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp2
-rw-r--r--src/mongo/db/matcher/expression_optimize_test.cpp2
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/cluster_aggregation_planner.cpp182
-rw-r--r--src/mongo/db/pipeline/cluster_aggregation_planner.h58
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp80
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h32
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors_test.cpp208
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h2
-rw-r--r--src/mongo/db/pipeline/expression_context.h4
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp128
-rw-r--r--src/mongo/db/pipeline/pipeline.h38
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp2
-rw-r--r--src/mongo/db/pipeline/pipeline_optimizations.h71
-rw-r--r--src/mongo/db/query/SConscript3
-rw-r--r--src/mongo/db/query/cursor_response.h25
-rw-r--r--src/mongo/db/query/getmore_request.cpp4
-rw-r--r--src/mongo/db/query/getmore_request.h4
-rw-r--r--src/mongo/db/query/query_request.cpp12
-rw-r--r--src/mongo/db/query/query_request.h12
-rw-r--r--src/mongo/db/query/query_request_test.cpp4
-rw-r--r--src/mongo/db/query/tailable_mode.cpp8
-rw-r--r--src/mongo/db/query/tailable_mode.h9
-rw-r--r--src/mongo/db/query/tailable_mode.idl37
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp22
-rw-r--r--src/mongo/db/repl/collection_cloner.h2
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp12
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp6
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp77
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp6
-rw-r--r--src/mongo/s/commands/strategy.cpp2
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/async_results_merger.cpp90
-rw-r--r--src/mongo/s/query/async_results_merger.h15
-rw-r--r--src/mongo/s/query/async_results_merger_params.idl89
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp658
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp41
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp3
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h39
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h2
-rw-r--r--src/mongo/s/query/cluster_find.cpp4
-rw-r--r--src/mongo/s/query/document_source_router_adapter.cpp4
-rw-r--r--src/mongo/s/query/document_source_router_adapter.h1
-rw-r--r--src/mongo/s/query/establish_cursors.cpp45
-rw-r--r--src/mongo/s/query/establish_cursors.h15
-rw-r--r--src/mongo/s/query/router_exec_stage.h8
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp21
-rw-r--r--src/mongo/s/query/router_stage_merge.h4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h2
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.cpp21
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.h5
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp15
-rw-r--r--src/mongo/s/query/store_possible_cursor.h2
-rw-r--r--src/mongo/shell/bench.cpp15
-rw-r--r--src/mongo/util/net/SConscript1
-rw-r--r--src/mongo/util/net/hostandport.h11
-rw-r--r--src/mongo/util/net/hostandport.idl38
66 files changed, 854 insertions, 1376 deletions
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp
index 23a9b12a792..399d6cb9143 100644
--- a/src/mongo/client/dbclientcursor.cpp
+++ b/src/mongo/client/dbclientcursor.cpp
@@ -137,7 +137,7 @@ Message DBClientCursor::_assembleInit() {
Message DBClientCursor::_assembleGetMore() {
invariant(cursorId);
if (_useFindCommand) {
- std::int64_t batchSize = nextBatchSize();
+ long long batchSize = nextBatchSize();
auto gmr = GetMoreRequest(ns,
cursorId,
boost::make_optional(batchSize != 0, batchSize),
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 4bc9e9b35cc..ac8a6fbf2f2 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -519,7 +519,7 @@ Status runAggregate(OperationContext* opCtx,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
opCtx->recoveryUnit()->getReadConcernLevel(),
cmdObj);
- if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (expCtx->tailableMode == TailableMode::kTailableAndAwaitData) {
cursorParams.setTailable(true);
cursorParams.setAwaitData(true);
}
diff --git a/src/mongo/db/matcher/expression_optimize_test.cpp b/src/mongo/db/matcher/expression_optimize_test.cpp
index e2f82257a67..9091ad48e93 100644
--- a/src/mongo/db/matcher/expression_optimize_test.cpp
+++ b/src/mongo/db/matcher/expression_optimize_test.cpp
@@ -133,7 +133,7 @@ TEST(ExpressionOptimizeTest, IsValidText) {
TEST(ExpressionOptimizeTest, IsValidTextTailable) {
// Filter inside QueryRequest is not used.
auto qr = stdx::make_unique<QueryRequest>(nss);
- qr->setTailableMode(TailableModeEnum::kTailable);
+ qr->setTailableMode(TailableMode::kTailable);
ASSERT_OK(qr->validate());
// Invalid: TEXT and tailable.
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 88c56ab45b9..3d4481ebbc4 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -301,7 +301,6 @@ pipelineeEnv.Library(
'document_source_skip.cpp',
'document_source_sort.cpp',
'document_source_sort_by_count.cpp',
- "cluster_aggregation_planner.cpp",
'document_source_tee_consumer.cpp',
'document_source_unwind.cpp',
'mongo_process_common.cpp',
diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp
deleted file mode 100644
index 3653b5b2638..00000000000
--- a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Copyright (C) 2018 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
-
-#include "mongo/db/pipeline/document_source_match.h"
-#include "mongo/db/pipeline/document_source_merge_cursors.h"
-#include "mongo/db/pipeline/document_source_project.h"
-#include "mongo/db/pipeline/document_source_sort.h"
-#include "mongo/db/pipeline/document_source_unwind.h"
-
-namespace mongo {
-namespace cluster_aggregation_planner {
-
-namespace {
-/**
- * Moves everything before a splittable stage to the shards. If there are no splittable stages,
- * moves everything to the shards.
- *
- * It is not safe to call this optimization multiple times.
- *
- * NOTE: looks for SplittableDocumentSources and uses that API
- */
-void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
- while (!mergePipe->getSources().empty()) {
- boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront();
-
- // Check if this source is splittable.
- SplittableDocumentSource* splittable =
- dynamic_cast<SplittableDocumentSource*>(current.get());
-
- if (!splittable) {
- // Move the source from the merger _sources to the shard _sources.
- shardPipe->pushBack(current);
- } else {
- // Split this source into 'merge' and 'shard' _sources.
- boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
- auto mergeSources = splittable->getMergeSources();
-
- // A source may not simultaneously be present on both sides of the split.
- invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) ==
- mergeSources.end());
-
- if (shardSource)
- shardPipe->pushBack(shardSource);
-
- // Add the stages in reverse order, so that they appear in the pipeline in the same
- // order as they were returned by the stage.
- for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) {
- mergePipe->addInitialSource(*it);
- }
-
- break;
- }
- }
-}
-
-/**
- * If the final stage on shards is to unwind an array, move that stage to the merger. This cuts down
- * on network traffic and allows us to take advantage of reduced copying in unwind.
- */
-void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) {
- while (!shardPipe->getSources().empty() &&
- dynamic_cast<DocumentSourceUnwind*>(shardPipe->getSources().back().get())) {
- mergePipe->addInitialSource(shardPipe->popBack());
- }
-}
-
-/**
- * Adds a stage to the end of 'shardPipe' explicitly requesting all fields that 'mergePipe' needs.
- * This is only done if it heuristically determines that it is needed. This optimization can reduce
- * the amount of network traffic and can also enable the shards to convert less source BSON into
- * Documents.
- */
-void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) {
- auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
- ? DepsTracker::MetadataAvailable::kTextScore
- : DepsTracker::MetadataAvailable::kNoMetadata;
- DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata));
- if (mergeDeps.needWholeDocument)
- return; // the merge needs all fields, so nothing we can do.
-
- // Empty project is "special" so if no fields are needed, we just ask for _id instead.
- if (mergeDeps.fields.empty())
- mergeDeps.fields.insert("_id");
-
- // Remove metadata from dependencies since it automatically flows through projection and we
- // don't want to project it in to the document.
- mergeDeps.setNeedTextScore(false);
-
- // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
- // field dependencies. While this may not be 100% ideal in all cases, it is simple and
- // avoids the worst cases by ensuring that:
- // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
- // dependencies. This situation can happen when a $sort is before the first $project or
- // $group. Without the optimization, the shards would have to reify and transmit full
- // objects even though only a subset of fields are needed.
- // 2) Optimization IS NOT applied immediately following a $project or $group since it would
- // add an unnecessary project (and therefore a deep-copy).
- for (auto&& source : shardPipe->getSources()) {
- DepsTracker dt(depsMetadata);
- if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
- return;
- }
- // if we get here, add the project.
- boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson(
- BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->getContext());
- shardPipe->pushBack(project);
-}
-} // namespace
-
-void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline) {
- // The order in which optimizations are applied can have significant impact on the
- // efficiency of the final pipeline. Be Careful!
- findSplitPoint(shardPipeline, mergingPipeline);
- moveFinalUnwindFromShardsToMerger(shardPipeline, mergingPipeline);
- limitFieldsSentFromShardsToMerger(shardPipeline, mergingPipeline);
-}
-
-boost::optional<BSONObj> popLeadingMergeSort(Pipeline* pipeline) {
- // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort.
- auto frontSort = pipeline->popFrontWithNameAndCriteria(
- DocumentSourceSort::kStageName, [](const DocumentSource* const source) {
- return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted();
- });
-
- if (frontSort) {
- auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
- if (auto sortLimit = sortStage->getLimitSrc()) {
- // There was a limit stage absorbed into the sort stage, so we need to preserve that.
- pipeline->addInitialSource(sortLimit);
- }
- return sortStage
- ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
- .toBson();
- }
- return boost::none;
-}
-
-void addMergeCursorsSource(Pipeline* mergePipeline,
- std::vector<RemoteCursor> remoteCursors,
- executor::TaskExecutor* executor) {
- AsyncResultsMergerParams armParams;
- if (auto sort = popLeadingMergeSort(mergePipeline)) {
- armParams.setSort(std::move(*sort));
- }
- armParams.setRemotes(std::move(remoteCursors));
- armParams.setTailableMode(mergePipeline->getContext()->tailableMode);
- armParams.setNss(mergePipeline->getContext()->ns);
- mergePipeline->addInitialSource(DocumentSourceMergeCursors::create(
- executor, std::move(armParams), mergePipeline->getContext()));
-}
-
-} // namespace cluster_aggregation_planner
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.h b/src/mongo/db/pipeline/cluster_aggregation_planner.h
deleted file mode 100644
index f62f158f6ca..00000000000
--- a/src/mongo/db/pipeline/cluster_aggregation_planner.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Copyright (C) 2018 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/pipeline/pipeline.h"
-
-namespace mongo {
-namespace cluster_aggregation_planner {
-
-/**
- * Performs optimizations with the aim of reducing computing time and network traffic when a
- * pipeline has been split into two pieces. Modifies 'shardPipeline' and 'mergingPipeline' such that
- * they may contain different stages, but still compute the same results when executed.
- */
-void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline);
-
-/**
- * Rips off an initial $sort stage that can be handled by cursor merging machinery. Returns the
- * sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
- */
-boost::optional<BSONObj> popLeadingMergeSort(Pipeline* mergePipeline);
-
-/**
- * Creates a new DocumentSourceMergeCursors from the provided 'remoteCursors' and adds it to the
- * front of 'mergePipeline'.
- */
-void addMergeCursorsSource(Pipeline* mergePipeline,
- std::vector<RemoteCursor> remoteCursors,
- executor::TaskExecutor*);
-
-} // namespace cluster_aggregation_planner
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index b227afeac5f..df7b31b330e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -395,7 +395,7 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
// A change stream is a tailable + awaitData cursor.
- expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
+ expCtx->tailableMode = TailableMode::kTailableAndAwaitData;
// Prevent $changeStream from running on an entire database (or cluster-wide) unless we are in
// test mode.
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 32efd87cdda..dfe4ece48ac 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -520,8 +520,9 @@ public:
Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
- if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") ||
- pipeline->popFrontWithName("$project")) {
+ if (pipeline->popFrontWithCriteria("$match") ||
+ pipeline->popFrontWithCriteria("$sort") ||
+ pipeline->popFrontWithCriteria("$project")) {
continue;
}
break;
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index 3e34fd366a0..06a314e8c2e 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -43,20 +43,15 @@ constexpr StringData DocumentSourceMergeCursors::kStageName;
DocumentSourceMergeCursors::DocumentSourceMergeCursors(
executor::TaskExecutor* executor,
- AsyncResultsMergerParams armParams,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- boost::optional<BSONObj> ownedParamsSpec)
- : DocumentSource(expCtx),
- _armParamsObj(std::move(ownedParamsSpec)),
- _executor(executor),
- _armParams(std::move(armParams)) {}
+ std::unique_ptr<ClusterClientCursorParams> cursorParams,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx), _executor(executor), _armParams(std::move(cursorParams)) {}
DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
// We don't expect or support tailable cursors to be executing through this stage.
- invariant(pExpCtx->tailableMode == TailableModeEnum::kNormal);
+ invariant(pExpCtx->tailableMode == TailableMode::kNormal);
if (!_arm) {
- _arm.emplace(pExpCtx->opCtx, _executor, std::move(*_armParams));
- _armParams = boost::none;
+ _arm.emplace(pExpCtx->opCtx, _executor, _armParams.get());
}
auto next = uassertStatusOK(_arm->blockingNext());
if (next.isEOF()) {
@@ -65,19 +60,24 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
return Document::fromBsonWithMetaData(*next.getResult());
}
-Value DocumentSourceMergeCursors::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
+void DocumentSourceMergeCursors::serializeToArray(
+ std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
invariant(!_arm);
invariant(_armParams);
- return Value(Document{{kStageName, _armParams->toBSON()}});
+ std::vector<Value> cursors;
+ for (auto&& remote : _armParams->remotes) {
+ cursors.emplace_back(Document{{"host", remote.hostAndPort.toString()},
+ {"ns", remote.cursorResponse.getNSS().toString()},
+ {"id", remote.cursorResponse.getCursorId()}});
+ }
+ array.push_back(Value(Document{{kStageName, Value(std::move(cursors))}}));
+ if (!_armParams->sort.isEmpty()) {
+ array.push_back(Value(Document{{DocumentSourceSort::kStageName, Value(_armParams->sort)}}));
+ }
}
Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
- // TODO SERVER-34009 3.6 and earlier mongos processes serialize $mergeCursors and $sort with
- // $mergingPresorted as separate stages. This code is responsible for coalescing these two
- // stages. New mongos processes serialize $mergeCursors with an absorbed sort as one stage. Once
- // we no longer have to support the old format, this code can be deleted.
invariant(*itr == this);
invariant(!_arm);
invariant(_armParams);
@@ -92,9 +92,9 @@ Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt(
return next;
}
- _armParams->setSort(
+ _armParams->sort =
nextSort->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
- .toBson());
+ .toBson();
if (auto sortLimit = nextSort->getLimitSrc()) {
// There was a limit stage absorbed into the sort stage, so we need to preserve that.
container->insert(std::next(next), sortLimit);
@@ -105,31 +105,15 @@ Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt(
boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- if (elem.type() == BSONType::Object) {
- // This is the modern serialization format. We de-serialize using the IDL.
- auto ownedObj = elem.embeddedObject().getOwned();
- auto armParams =
- AsyncResultsMergerParams::parse(IDLParserErrorContext(kStageName), ownedObj);
- return new DocumentSourceMergeCursors(
- Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
- std::move(armParams),
- expCtx,
- std::move(ownedObj));
- }
-
- // This is the old serialization format which can still be generated by mongos processes
- // older than 4.0.
- // TODO SERVER-34009 Remove support for this format.
- uassert(17026,
- "$mergeCursors stage expected either an array or an object as argument",
- elem.type() == BSONType::Array);
+ uassert(
+ 17026, "$mergeCursors stage expected array as argument", elem.type() == BSONType::Array);
const auto serializedRemotes = elem.Array();
uassert(50729,
"$mergeCursors stage expected array with at least one entry",
serializedRemotes.size() > 0);
boost::optional<NamespaceString> nss;
- std::vector<RemoteCursor> remotes;
+ std::vector<ClusterClientCursorParams::RemoteCursor> remotes;
for (auto&& cursor : serializedRemotes) {
BSONElement nsElem;
BSONElement hostElem;
@@ -180,30 +164,28 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
// any data in the initial batch.
// TODO SERVER-33323 We use a fake shard id because the AsyncResultsMerger won't use it for
// anything, and finding the real one is non-trivial.
- RemoteCursor remoteCursor;
- remoteCursor.setShardId(ShardId("fakeShardIdForMergeCursors"));
- remoteCursor.setHostAndPort(std::move(host));
std::vector<BSONObj> emptyBatch;
- remoteCursor.setCursorResponse(CursorResponse{*nss, cursorId, emptyBatch});
- remotes.push_back(std::move(remoteCursor));
+ remotes.push_back({ShardId("fakeShardIdForMergeCursors"),
+ std::move(host),
+ CursorResponse{*nss, cursorId, emptyBatch}});
}
invariant(nss); // We know there is at least one cursor in 'serializedRemotes', and we require
// each cursor to have a 'ns' field.
- AsyncResultsMergerParams params;
- params.setRemotes(std::move(remotes));
- params.setNss(*nss);
+ auto params = stdx::make_unique<ClusterClientCursorParams>(*nss);
+ params->remotes = std::move(remotes);
return new DocumentSourceMergeCursors(
Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
std::move(params),
- expCtx,
- elem.embeddedObject().getOwned());
+ expCtx);
}
boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
+ std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors,
executor::TaskExecutor* executor,
- AsyncResultsMergerParams params,
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ auto params = stdx::make_unique<ClusterClientCursorParams>(expCtx->ns);
+ params->remotes = std::move(remoteCursors);
return new DocumentSourceMergeCursors(executor, std::move(params), expCtx);
}
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index 611d09bcd34..cb74f4bd9ef 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -55,11 +55,11 @@ public:
BSONElement, const boost::intrusive_ptr<ExpressionContext>&);
/**
- * Creates a new DocumentSourceMergeCursors from the given parameters.
+ * Creates a new DocumentSourceMergeCursors from the given 'remoteCursors'.
*/
static boost::intrusive_ptr<DocumentSource> create(
+ std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors,
executor::TaskExecutor*,
- AsyncResultsMergerParams,
const boost::intrusive_ptr<ExpressionContext>&);
const char* getSourceName() const final {
@@ -77,7 +77,9 @@ public:
/**
* Serializes this stage to be sent to perform the merging on a different host.
*/
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+ void serializeToArray(
+ std::vector<Value>& array,
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
StageConstraints constraints(StreamType::kStreaming,
@@ -100,23 +102,21 @@ protected:
private:
DocumentSourceMergeCursors(executor::TaskExecutor*,
- AsyncResultsMergerParams,
- const boost::intrusive_ptr<ExpressionContext>&,
- boost::optional<BSONObj> ownedParamsSpec = boost::none);
+ std::unique_ptr<ClusterClientCursorParams>,
+ const boost::intrusive_ptr<ExpressionContext>&);
- // When we have parsed the params out of a BSONObj, the object needs to stay around while the
- // params are in use. We store them here.
- boost::optional<BSONObj> _armParamsObj;
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ MONGO_UNREACHABLE; // Should call serializeToArray instead.
+ }
executor::TaskExecutor* _executor;
- // '_armParams' is populated until the first call to getNext(). Upon the first call to getNext()
- // '_arm' will be populated using '_armParams', and '_armParams' will become boost::none. So if
- // getNext() is never called we will never populate '_arm'. If we did so the destruction of this
- // stage would cause the cursors within the ARM to be killed prematurely. For example, if this
- // stage is parsed on mongos then forwarded to the shards, it should not kill the cursors when
- // it goes out of scope on mongos.
- boost::optional<AsyncResultsMergerParams> _armParams;
+ // '_arm' is not populated until the first call to getNext(). If getNext() is never called we
+ // will not create an AsyncResultsMerger. If we did so the destruction of this stage would cause
+ // the cursors within the ARM to be killed prematurely. For example, if this stage is parsed on
+ // mongos then forwarded to the shards, it should not kill the cursors when it goes out of scope
+ // on mongos.
+ std::unique_ptr<ClusterClientCursorParams> _armParams;
boost::optional<AsyncResultsMerger> _arm;
};
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
index 3614324d27d..4809ac54b50 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
@@ -111,7 +111,9 @@ private:
};
TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectNonArray) {
- auto spec = BSON("$mergeCursors" << 2);
+ auto spec = BSON("$mergeCursors" << BSON(
+ "cursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host"
+ << kTestHost.toString()))));
ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
AssertionException,
17026);
@@ -211,66 +213,16 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseTheSerializedVersionOf
ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx()));
}
-RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
- RemoteCursor remoteCursor;
- remoteCursor.setShardId(std::move(shardId));
- remoteCursor.setHostAndPort(std::move(host));
- remoteCursor.setCursorResponse(std::move(response));
- return remoteCursor;
-}
-
-TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseSerializedARMParams) {
- AsyncResultsMergerParams params;
- params.setSort(BSON("y" << 1 << "z" << 1));
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(makeRemoteCursor(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, kExhaustedCursorID, {})));
- params.setRemotes(std::move(cursors));
- auto spec = BSON("$mergeCursors" << params.toBSON());
- auto mergeCursors =
- DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx());
- std::vector<Value> serializationArray;
- mergeCursors->serializeToArray(serializationArray);
- ASSERT_EQ(serializationArray.size(), 1UL);
-
- // Make sure the serialized version can be parsed into an identical AsyncResultsMergerParams.
- auto newSpec = serializationArray[0].getDocument().toBson();
- ASSERT(newSpec["$mergeCursors"].type() == BSONType::Object);
- auto newParams = AsyncResultsMergerParams::parse(IDLParserErrorContext("$mergeCursors test"),
- newSpec["$mergeCursors"].Obj());
- ASSERT_TRUE(params.getSort());
- ASSERT_BSONOBJ_EQ(*params.getSort(), *newParams.getSort());
- ASSERT_EQ(params.getCompareWholeSortKey(), newParams.getCompareWholeSortKey());
- ASSERT(params.getTailableMode() == newParams.getTailableMode());
- ASSERT(params.getBatchSize() == newParams.getBatchSize());
- ASSERT_EQ(params.getNss(), newParams.getNss());
- ASSERT_EQ(params.getAllowPartialResults(), newParams.getAllowPartialResults());
- ASSERT_EQ(newParams.getRemotes().size(), 1UL);
- ASSERT(newParams.getRemotes()[0].getShardId() == kTestShardIds[0].toString());
- ASSERT(newParams.getRemotes()[0].getHostAndPort() == kTestShardHosts[0]);
- ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getNSS(), kTestNss);
- ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getCursorId(), kExhaustedCursorID);
- ASSERT(newParams.getRemotes()[0].getCursorResponse().getBatch().empty());
-
- // Test that the $mergeCursors stage will accept the serialized format of
- // AsyncResultsMergerParams.
- ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx()));
-}
-
TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) {
auto expCtx = getExpCtx();
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(makeRemoteCursor(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {})));
- cursors.emplace_back(makeRemoteCursor(
- kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {})));
- armParams.setRemotes(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
+ cursors.emplace_back(
+ kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
auto mergeCursorsStage =
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx);
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
ASSERT_TRUE(mergeCursorsStage->getNext().isEOF());
}
@@ -284,17 +236,12 @@ BSONObj cursorResponseObj(const NamespaceString& nss,
TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
auto expCtx = getExpCtx();
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
// Iterate the $mergeCursors stage asynchronously on a different thread, since it will block
// waiting for network responses, which we will manually schedule below.
@@ -333,17 +280,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) {
auto expCtx = getExpCtx();
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
pipeline.reset(); // Delete the pipeline before using it.
@@ -353,15 +295,11 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) {
TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) {
auto expCtx = getExpCtx();
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- armParams.setRemotes(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
// Iterate the pipeline asynchronously on a different thread, since it will block waiting for
// network responses, which we will manually schedule below.
@@ -406,16 +344,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd
auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
// Make a $mergeCursors stage and add it to the front of the pipeline.
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
// After optimization we should only have a $mergeCursors stage.
pipeline->optimizePipeline();
@@ -430,7 +364,6 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd
ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}}));
ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}}));
ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
- std::cout << "Finished";
});
onCommand([&](const auto& request) {
@@ -449,35 +382,36 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd
future.timed_get(kFutureTimeout);
}
-TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) {
+TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) {
auto expCtx = getExpCtx();
- auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
- // Make a $mergeCursors stage with a sort on "x" and add it to the front of the pipeline.
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- armParams.setSort(BSON("x" << 1));
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ // Make a pipeline with a single $sort stage that is merging pre-sorted results.
+ const bool mergingPresorted = true;
+ const long long limit = 3;
+ auto sortStage = DocumentSourceSort::create(
+ expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted);
+ auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
- // After optimization we should only have a $mergeCursors stage.
+ // Make a $mergeCursors stage and add it to the front of the pipeline.
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
+
+ // After optimization, we should still have a $limit stage.
pipeline->optimizePipeline();
- ASSERT_EQ(pipeline->getSources().size(), 1UL);
+ ASSERT_EQ(pipeline->getSources().size(), 2UL);
ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+ ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
// Iterate the pipeline asynchronously on a different thread, since it will block waiting for
// network responses, which we will manually schedule below.
- auto future = launchAsync([&pipeline]() {
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}}));
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 2}}));
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}}));
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}}));
+ auto future = launchAsync([&]() {
+ for (int i = 1; i <= limit; ++i) {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}}));
+ }
ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
});
@@ -497,7 +431,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) {
future.timed_get(kFutureTimeout);
}
-TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) {
+TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) {
auto expCtx = getExpCtx();
// Make a pipeline with a single $sort stage that is merging pre-sorted results.
@@ -508,16 +442,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea
auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
// Make a $mergeCursors stage and add it to the front of the pipeline.
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
// After optimization, we should still have a $limit stage.
pipeline->optimizePipeline();
@@ -525,29 +455,11 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea
ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
- // Iterate the pipeline asynchronously on a different thread, since it will block waiting for
- // network responses, which we will manually schedule below.
- auto future = launchAsync([&]() {
- for (int i = 1; i <= limit; ++i) {
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}}));
- }
- ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
- });
-
- onCommand([&](const auto& request) {
- return cursorResponseObj(expCtx->ns,
- kExhaustedCursorID,
- {BSON("x" << 1 << "$sortKey" << BSON("" << 1)),
- BSON("x" << 3 << "$sortKey" << BSON("" << 3))});
- });
- onCommand([&](const auto& request) {
- return cursorResponseObj(expCtx->ns,
- kExhaustedCursorID,
- {BSON("x" << 2 << "$sortKey" << BSON("" << 2)),
- BSON("x" << 4 << "$sortKey" << BSON("" << 4))});
- });
-
- future.timed_get(kFutureTimeout);
+ auto serialized = pipeline->serialize();
+ ASSERT_EQ(serialized.size(), 3UL);
+ ASSERT_FALSE(serialized[0]["$mergeCursors"].missing());
+ ASSERT_FALSE(serialized[1]["$sort"].missing());
+ ASSERT_FALSE(serialized[2]["$limit"].missing());
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index c5989718d5e..d2ad1cd9ec4 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -262,7 +262,7 @@ private:
uint64_t _maxMemoryUsageBytes;
bool _done;
- bool _mergingPresorted; // TODO SERVER-34009 Remove this flag.
+ bool _mergingPresorted;
std::unique_ptr<MySorter> _sorter;
std::unique_ptr<MySorter::Iterator> _output;
};
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index b5e490e91d7..6847d43e8a4 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -161,7 +161,7 @@ public:
* query.
*/
bool isTailableAwaitData() const {
- return tailableMode == TailableModeEnum::kTailableAndAwaitData;
+ return tailableMode == TailableMode::kTailableAndAwaitData;
}
// The explain verbosity requested by the user, or boost::none if no explain was requested.
@@ -198,7 +198,7 @@ public:
Variables variables;
VariablesParseState variablesParseState;
- TailableModeEnum tailableMode = TailableModeEnum::kNormal;
+ TailableMode tailableMode = TailableMode::kNormal;
// Tracks the depth of nested aggregation sub-pipelines. Used to enforce depth limits.
size_t subPipelineDepth = 0;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 335ca4f5655..2f6fce9e526 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -28,7 +28,9 @@
#include "mongo/platform/basic.h"
+// This file defines functions from both of these headers
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/pipeline_optimizations.h"
#include <algorithm>
@@ -38,7 +40,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/accumulator.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
@@ -46,7 +47,6 @@
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
-#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -329,7 +329,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() {
// Keep a copy of the original source list in case we need to reset the pipeline from split to
// unsplit later.
shardPipeline->_unsplitSources.emplace(_sources);
- cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this);
+
+ // The order in which optimizations are applied can have significant impact on the
+ // efficiency of the final pipeline. Be Careful!
+ Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
+ Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
+ Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
+
shardPipeline->_splitState = SplitState::kSplitForShards;
_splitState = SplitState::kSplitForMerge;
@@ -360,6 +366,87 @@ void Pipeline::unsplitFromSharded(
stitch();
}
+void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
+ while (!mergePipe->_sources.empty()) {
+ intrusive_ptr<DocumentSource> current = mergePipe->_sources.front();
+ mergePipe->_sources.pop_front();
+
+ // Check if this source is splittable.
+ SplittableDocumentSource* splittable =
+ dynamic_cast<SplittableDocumentSource*>(current.get());
+
+ if (!splittable) {
+ // Move the source from the merger _sources to the shard _sources.
+ shardPipe->_sources.push_back(current);
+ } else {
+ // Split this source into 'merge' and 'shard' _sources.
+ intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
+ auto mergeSources = splittable->getMergeSources();
+
+ // A source may not simultaneously be present on both sides of the split.
+ invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) ==
+ mergeSources.end());
+
+ if (shardSource)
+ shardPipe->_sources.push_back(shardSource);
+
+ // Add the stages in reverse order, so that they appear in the pipeline in the same
+ // order as they were returned by the stage.
+ for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) {
+ mergePipe->_sources.push_front(*it);
+ }
+
+ break;
+ }
+ }
+}
+
+void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe,
+ Pipeline* mergePipe) {
+ while (!shardPipe->_sources.empty() &&
+ dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) {
+ mergePipe->_sources.push_front(shardPipe->_sources.back());
+ shardPipe->_sources.pop_back();
+ }
+}
+
+void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
+ Pipeline* mergePipe) {
+ auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
+ ? DepsTracker::MetadataAvailable::kTextScore
+ : DepsTracker::MetadataAvailable::kNoMetadata;
+ DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata));
+ if (mergeDeps.needWholeDocument)
+ return; // the merge needs all fields, so nothing we can do.
+
+ // Empty project is "special" so if no fields are needed, we just ask for _id instead.
+ if (mergeDeps.fields.empty())
+ mergeDeps.fields.insert("_id");
+
+ // Remove metadata from dependencies since it automatically flows through projection and we
+ // don't want to project it in to the document.
+ mergeDeps.setNeedTextScore(false);
+
+ // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
+ // field dependencies. While this may not be 100% ideal in all cases, it is simple and
+ // avoids the worst cases by ensuring that:
+ // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
+ // dependencies. This situation can happen when a $sort is before the first $project or
+ // $group. Without the optimization, the shards would have to reify and transmit full
+ // objects even though only a subset of fields are needed.
+ // 2) Optimization IS NOT applied immediately following a $project or $group since it would
+ // add an unnecessary project (and therefore a deep-copy).
+ for (auto&& source : shardPipe->_sources) {
+ DepsTracker dt(depsMetadata);
+ if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
+ return;
+ }
+ // if we get here, add the project.
+ boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson(
+ BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx);
+ shardPipe->_sources.push_back(project);
+}
+
BSONObj Pipeline::getInitialQuery() const {
if (_sources.empty())
return BSONObj();
@@ -612,35 +699,7 @@ Status Pipeline::_pipelineCanRunOnMongoS() const {
return Status::OK();
}
-void Pipeline::pushBack(boost::intrusive_ptr<DocumentSource> newStage) {
- newStage->setSource(_sources.back().get());
- _sources.push_back(std::move(newStage));
-}
-
-boost::intrusive_ptr<DocumentSource> Pipeline::popBack() {
- if (_sources.empty()) {
- return nullptr;
- }
- auto targetStage = _sources.back();
- _sources.pop_back();
- return targetStage;
-}
-
-boost::intrusive_ptr<DocumentSource> Pipeline::popFront() {
- if (_sources.empty()) {
- return nullptr;
- }
- auto targetStage = _sources.front();
- _sources.pop_front();
- stitch();
- return targetStage;
-}
-
-boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithName(StringData targetStageName) {
- return popFrontWithNameAndCriteria(targetStageName, nullptr);
-}
-
-boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria(
+boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria(
StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) {
if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) {
return nullptr;
@@ -651,7 +710,8 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria(
return nullptr;
}
- return popFront();
+ _sources.pop_front();
+ stitch();
+ return targetStage;
}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 04b5769792d..bcb7ee64521 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -39,8 +39,6 @@
#include "mongo/db/pipeline/value.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/query/query_knobs.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/timer.h"
@@ -270,34 +268,13 @@ public:
}
/**
- * Removes and returns the first stage of the pipeline. Returns nullptr if the pipeline is
- * empty.
- */
- boost::intrusive_ptr<DocumentSource> popFront();
-
- /**
- * Removes and returns the last stage of the pipeline. Returns nullptr if the pipeline is empty.
- */
- boost::intrusive_ptr<DocumentSource> popBack();
-
- /**
- * Adds the given stage to the end of the pipeline.
- */
- void pushBack(boost::intrusive_ptr<DocumentSource>);
-
- /**
- * Removes and returns the first stage of the pipeline if its name is 'targetStageName'.
- * Returns nullptr if there is no first stage with that name.
- */
- boost::intrusive_ptr<DocumentSource> popFrontWithName(StringData targetStageName);
-
- /**
* Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the
* given 'predicate' function, if present, returns 'true' when called with a pointer to the
* stage. Returns nullptr if there is no first stage which meets these criteria.
*/
- boost::intrusive_ptr<DocumentSource> popFrontWithNameAndCriteria(
- StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate);
+ boost::intrusive_ptr<DocumentSource> popFrontWithCriteria(
+ StringData targetStageName,
+ stdx::function<bool(const DocumentSource* const)> predicate = nullptr);
/**
* PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists
@@ -309,6 +286,15 @@ public:
friend class PipelineD;
private:
+ class Optimizations {
+ public:
+ // This contains static functions that optimize pipelines in various ways.
+ // This is a class rather than a namespace so that it can be a friend of Pipeline.
+ // It is defined in pipeline_optimizations.h.
+ class Sharded;
+ };
+
+ friend class Optimizations::Sharded;
friend class PipelineDeleter;
/**
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 043452c3e0f..244946bd877 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -386,7 +386,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS;
}
- if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (expCtx->needsMerge && expCtx->tailableMode == TailableMode::kTailableAndAwaitData) {
plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
}
diff --git a/src/mongo/db/pipeline/pipeline_optimizations.h b/src/mongo/db/pipeline/pipeline_optimizations.h
new file mode 100644
index 00000000000..af85fdff4b0
--- /dev/null
+++ b/src/mongo/db/pipeline/pipeline_optimizations.h
@@ -0,0 +1,71 @@
+/**
+ * Copyright 2013 (c) 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+/**
+ * This file declares optimizations available on Pipelines. For now they should be considered part
+ * of Pipeline's implementation rather than it's interface.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/pipeline.h"
+
+namespace mongo {
+/**
+ * This class holds optimizations applied to a shard Pipeline and a merger Pipeline.
+ *
+ * Each function has the same signature and takes two Pipelines, both as an in/out parameters.
+ */
+class Pipeline::Optimizations::Sharded {
+public:
+ /**
+ * Moves everything before a splittable stage to the shards. If there
+ * are no splittable stages, moves everything to the shards.
+ *
+ * It is not safe to call this optimization multiple times.
+ *
+ * NOTE: looks for SplittableDocumentSources and uses that API
+ */
+ static void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe);
+
+ /**
+ * If the final stage on shards is to unwind an array, move that stage to the merger. This
+ * cuts down on network traffic and allows us to take advantage of reduced copying in
+ * unwind.
+ */
+ static void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe);
+
+ /**
+ * Adds a stage to the end of shardPipe explicitly requesting all fields that mergePipe
+ * needs. This is only done if it heuristically determines that it is needed. This
+ * optimization can reduce the amount of network traffic and can also enable the shards to
+ * convert less source BSON into Documents.
+ */
+ static void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe);
+};
+} // namespace mongo
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 3638d5ba00b..81c4ac22cf9 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -187,8 +187,7 @@ env.Library(
target="query_request",
source=[
"query_request.cpp",
- "tailable_mode.cpp",
- env.Idlc("tailable_mode.idl")[0],
+ "tailable_mode.cpp"
],
LIBDEPS=[
"$BUILD_DIR/mongo/base",
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index b2dd828f4c2..d3852f2ee21 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -141,23 +141,6 @@ public:
};
/**
- * Constructs a CursorResponse from the command BSON response.
- */
- static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse);
-
- /**
- * A throwing version of 'parseFromBSON'.
- */
- static CursorResponse parseFromBSONThrowing(const BSONObj& cmdResponse) {
- return uassertStatusOK(parseFromBSON(cmdResponse));
- }
-
- /**
- * Constructs an empty cursor response.
- */
- CursorResponse() = default;
-
- /**
* Constructs from values for each of the fields.
*/
CursorResponse(NamespaceString nss,
@@ -203,13 +186,15 @@ public:
}
/**
+ * Constructs a CursorResponse from the command BSON response.
+ */
+ static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse);
+
+ /**
* Converts this response to its raw BSON representation.
*/
BSONObj toBSON(ResponseType responseType) const;
void addToBSON(ResponseType responseType, BSONObjBuilder* builder) const;
- BSONObj toBSONAsInitialResponse() const {
- return toBSON(ResponseType::InitialResponse);
- }
private:
NamespaceString _nss;
diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp
index ea2515123f6..a6b29ea520b 100644
--- a/src/mongo/db/query/getmore_request.cpp
+++ b/src/mongo/db/query/getmore_request.cpp
@@ -59,7 +59,7 @@ GetMoreRequest::GetMoreRequest() : cursorid(0), batchSize(0) {}
GetMoreRequest::GetMoreRequest(NamespaceString namespaceString,
CursorId id,
- boost::optional<std::int64_t> sizeOfBatch,
+ boost::optional<long long> sizeOfBatch,
boost::optional<Milliseconds> awaitDataTimeout,
boost::optional<long long> term,
boost::optional<repl::OpTime> lastKnownCommittedOpTime)
@@ -108,7 +108,7 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna
boost::optional<NamespaceString> nss;
// Optional fields.
- boost::optional<std::int64_t> batchSize;
+ boost::optional<long long> batchSize;
boost::optional<Milliseconds> awaitDataTimeout;
boost::optional<long long> term;
boost::optional<repl::OpTime> lastKnownCommittedOpTime;
diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h
index 4da4839abbd..8fa2d0fc5dd 100644
--- a/src/mongo/db/query/getmore_request.h
+++ b/src/mongo/db/query/getmore_request.h
@@ -52,7 +52,7 @@ struct GetMoreRequest {
*/
GetMoreRequest(NamespaceString namespaceString,
CursorId id,
- boost::optional<std::int64_t> sizeOfBatch,
+ boost::optional<long long> sizeOfBatch,
boost::optional<Milliseconds> awaitDataTimeout,
boost::optional<long long> term,
boost::optional<repl::OpTime> lastKnownCommittedOpTime);
@@ -76,7 +76,7 @@ struct GetMoreRequest {
// The batch size is optional. If not provided, we will put as many documents into the batch
// as fit within the byte limit.
- const boost::optional<std::int64_t> batchSize;
+ const boost::optional<long long> batchSize;
// The number of milliseconds for which a getMore on a tailable, awaitData query should block.
const boost::optional<Milliseconds> awaitDataTimeout;
diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp
index 56ffe5dbb42..6268785c6a6 100644
--- a/src/mongo/db/query/query_request.cpp
+++ b/src/mongo/db/query/query_request.cpp
@@ -495,16 +495,16 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const {
}
switch (_tailableMode) {
- case TailableModeEnum::kTailable: {
+ case TailableMode::kTailable: {
cmdBuilder->append(kTailableField, true);
break;
}
- case TailableModeEnum::kTailableAndAwaitData: {
+ case TailableMode::kTailableAndAwaitData: {
cmdBuilder->append(kTailableField, true);
cmdBuilder->append(kAwaitDataField, true);
break;
}
- case TailableModeEnum::kNormal: {
+ case TailableMode::kNormal: {
break;
}
}
@@ -623,7 +623,7 @@ Status QueryRequest::validate() const {
<< _maxTimeMS);
}
- if (_tailableMode != TailableModeEnum::kNormal) {
+ if (_tailableMode != TailableMode::kNormal) {
// Tailable cursors cannot have any sort other than {$natural: 1}.
const BSONObj expectedSort = BSON(kNaturalSortField << 1);
if (!_sort.isEmpty() &&
@@ -911,9 +911,9 @@ Status QueryRequest::initFullQuery(const BSONObj& top) {
int QueryRequest::getOptions() const {
int options = 0;
- if (_tailableMode == TailableModeEnum::kTailable) {
+ if (_tailableMode == TailableMode::kTailable) {
options |= QueryOption_CursorTailable;
- } else if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ } else if (_tailableMode == TailableMode::kTailableAndAwaitData) {
options |= QueryOption_CursorTailable;
options |= QueryOption_AwaitData;
}
diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h
index 29abf54aaca..7a89e466faf 100644
--- a/src/mongo/db/query/query_request.h
+++ b/src/mongo/db/query/query_request.h
@@ -319,19 +319,19 @@ public:
}
bool isTailable() const {
- return _tailableMode == TailableModeEnum::kTailable ||
- _tailableMode == TailableModeEnum::kTailableAndAwaitData;
+ return _tailableMode == TailableMode::kTailable ||
+ _tailableMode == TailableMode::kTailableAndAwaitData;
}
bool isTailableAndAwaitData() const {
- return _tailableMode == TailableModeEnum::kTailableAndAwaitData;
+ return _tailableMode == TailableMode::kTailableAndAwaitData;
}
- void setTailableMode(TailableModeEnum tailableMode) {
+ void setTailableMode(TailableMode tailableMode) {
_tailableMode = tailableMode;
}
- TailableModeEnum getTailableMode() const {
+ TailableMode getTailableMode() const {
return _tailableMode;
}
@@ -498,7 +498,7 @@ private:
bool _hasReadPref = false;
// Options that can be specified in the OP_QUERY 'flags' header.
- TailableModeEnum _tailableMode = TailableModeEnum::kNormal;
+ TailableMode _tailableMode = TailableMode::kNormal;
bool _slaveOk = false;
bool _oplogReplay = false;
bool _noCursorTimeout = false;
diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp
index 9992fdb155e..edcf58cecb1 100644
--- a/src/mongo/db/query/query_request_test.cpp
+++ b/src/mongo/db/query/query_request_test.cpp
@@ -1165,7 +1165,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithShowRecordIdFails) {
TEST(QueryRequestTest, ConvertToAggregationWithTailableFails) {
QueryRequest qr(testns);
- qr.setTailableMode(TailableModeEnum::kTailable);
+ qr.setTailableMode(TailableMode::kTailable);
ASSERT_NOT_OK(qr.asAggregationCommand());
}
@@ -1183,7 +1183,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithNoCursorTimeoutFails) {
TEST(QueryRequestTest, ConvertToAggregationWithAwaitDataFails) {
QueryRequest qr(testns);
- qr.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ qr.setTailableMode(TailableMode::kTailableAndAwaitData);
ASSERT_NOT_OK(qr.asAggregationCommand());
}
diff --git a/src/mongo/db/query/tailable_mode.cpp b/src/mongo/db/query/tailable_mode.cpp
index f09a7b59c41..b19a1988672 100644
--- a/src/mongo/db/query/tailable_mode.cpp
+++ b/src/mongo/db/query/tailable_mode.cpp
@@ -32,17 +32,17 @@
namespace mongo {
-StatusWith<TailableModeEnum> tailableModeFromBools(bool isTailable, bool isAwaitData) {
+StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData) {
if (isTailable) {
if (isAwaitData) {
- return TailableModeEnum::kTailableAndAwaitData;
+ return TailableMode::kTailableAndAwaitData;
}
- return TailableModeEnum::kTailable;
+ return TailableMode::kTailable;
} else if (isAwaitData) {
return {ErrorCodes::FailedToParse,
"Cannot set 'awaitData' without also setting 'tailable'"};
}
- return TailableModeEnum::kNormal;
+ return TailableMode::kNormal;
}
} // namespace mongo
diff --git a/src/mongo/db/query/tailable_mode.h b/src/mongo/db/query/tailable_mode.h
index 531191c6f76..92c0fe9292e 100644
--- a/src/mongo/db/query/tailable_mode.h
+++ b/src/mongo/db/query/tailable_mode.h
@@ -30,14 +30,19 @@
#include "mongo/base/error_codes.h"
#include "mongo/base/status_with.h"
-#include "mongo/db/query/tailable_mode_gen.h"
namespace mongo {
+enum class TailableMode {
+ kNormal,
+ kTailable,
+ kTailableAndAwaitData,
+};
+
/**
* Returns a TailableMode from two booleans, returning ErrorCodes::FailedToParse if awaitData is
* set without tailable.
*/
-StatusWith<TailableModeEnum> tailableModeFromBools(bool isTailable, bool isAwaitData);
+StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData);
} // namespace mongo
diff --git a/src/mongo/db/query/tailable_mode.idl b/src/mongo/db/query/tailable_mode.idl
deleted file mode 100644
index 33d5b6989dd..00000000000
--- a/src/mongo/db/query/tailable_mode.idl
+++ /dev/null
@@ -1,37 +0,0 @@
-# Copyright (C) 2018 MongoDB Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License, version 3,
-# as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the GNU Affero General Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-
-global:
- cpp_namespace: "mongo"
-
-enums:
- TailableMode:
- description: "Describes the tailablility of a cursor."
- type: string
- values:
- kNormal: "normal"
- kTailable: "tailable"
- kTailableAndAwaitData: "tailableAndAwaitData"
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index fb0890b87a5..bcb0da9d789 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1241,6 +1241,7 @@ env.Library(
'$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/s/query/async_results_merger',
+ '$BUILD_DIR/mongo/s/query/cluster_client_cursor',
'$BUILD_DIR/mongo/util/progress_meter',
],
)
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 8aee68a4c58..773f54a0b8b 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/server_parameters.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/fail_point_service.h"
@@ -612,25 +613,20 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
<< " cursors established.";
// Initialize the 'AsyncResultsMerger'(ARM).
- std::vector<RemoteCursor> remoteCursors;
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
for (auto&& cursorResponse : cursorResponses) {
// A placeholder 'ShardId' is used until the ARM is made less sharding specific.
- remoteCursors.emplace_back();
- auto& newCursor = remoteCursors.back();
- newCursor.setShardId("CollectionClonerSyncSource");
- newCursor.setHostAndPort(_source);
- newCursor.setCursorResponse(std::move(cursorResponse));
+ remoteCursors.emplace_back(
+ ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse));
}
- AsyncResultsMergerParams armParams;
- armParams.setNss(_sourceNss);
- armParams.setRemotes(std::move(remoteCursors));
- if (_collectionCloningBatchSize > 0) {
- armParams.setBatchSize(_collectionCloningBatchSize);
- }
+ _clusterClientCursorParams = stdx::make_unique<ClusterClientCursorParams>(_sourceNss);
+ _clusterClientCursorParams->remotes = std::move(remoteCursors);
+ if (_collectionCloningBatchSize > 0)
+ _clusterClientCursorParams->batchSize = _collectionCloningBatchSize;
Client::initThreadIfNotAlready();
_arm = stdx::make_unique<AsyncResultsMerger>(
- cc().getOperationContext(), _executor, std::move(armParams));
+ cc().getOperationContext(), _executor, _clusterClientCursorParams.get());
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index d97c5c0048f..e79258e3734 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -301,6 +301,8 @@ private:
const int _maxNumClonerCursors;
// (M) Component responsible for fetching the documents from the collection cloner cursor(s).
std::unique_ptr<AsyncResultsMerger> _arm;
+ // (R) The cursor parameters used by the 'AsyncResultsMerger'.
+ std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams;
// (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'.
executor::TaskExecutor::EventHandle _killArmHandle;
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index 5a834ed8aa5..ebfaccbde5c 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -369,7 +369,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout)
opCtx(), collScanParams, workingSet.get(), matchExpression.get());
auto queryRequest = stdx::make_unique<QueryRequest>(nss);
queryRequest->setFilter(filter);
- queryRequest->setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData);
auto canonicalQuery = unittest::assertGet(
CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
auto planExecutor =
@@ -381,7 +381,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout)
PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData;
+ ctx()->tailableMode = TailableMode::kTailableAndAwaitData;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
@@ -419,7 +419,7 @@ TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) {
PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = TailableModeEnum::kNormal;
+ ctx()->tailableMode = TailableMode::kNormal;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
@@ -449,7 +449,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil
opCtx(), collScanParams, workingSet.get(), matchExpression.get());
auto queryRequest = stdx::make_unique<QueryRequest>(nss);
queryRequest->setFilter(filter);
- queryRequest->setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData);
auto canonicalQuery = unittest::assertGet(
CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
auto planExecutor =
@@ -461,7 +461,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil
PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData;
+ ctx()->tailableMode = TailableMode::kTailableAndAwaitData;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
@@ -498,7 +498,7 @@ TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) {
PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = TailableModeEnum::kNormal;
+ ctx()->tailableMode = TailableMode::kNormal;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index 51a5f7723cd..8f8025e4b9d 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -99,7 +99,7 @@ public:
Collection* coll,
BSONObj& filterObj,
PlanExecutor::YieldPolicy yieldPolicy = PlanExecutor::YieldPolicy::YIELD_MANUAL,
- TailableModeEnum tailableMode = TailableModeEnum::kNormal) {
+ TailableMode tailableMode = TailableMode::kNormal) {
CollectionScanParams csparams;
csparams.collection = coll;
csparams.direction = CollectionScanParams::FORWARD;
@@ -307,7 +307,7 @@ TEST_F(PlanExecutorTest, ShouldReportErrorIfKilledDuringYieldButIsTailableAndAwa
auto exec = makeCollScanExec(coll,
filterObj,
PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT,
- TailableModeEnum::kTailableAndAwaitData);
+ TailableMode::kTailableAndAwaitData);
BSONObj resultObj;
ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr));
@@ -323,7 +323,7 @@ TEST_F(PlanExecutorTest, ShouldNotSwallowExceedsTimeLimitDuringYieldButIsTailabl
Collection* coll = ctx.getCollection();
auto exec = makeCollScanExec(
- coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableModeEnum::kTailable);
+ coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableMode::kTailable);
BSONObj resultObj;
ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr));
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index c0643205001..7c673f73b30 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -41,7 +41,6 @@
#include "mongo/db/curop.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
@@ -264,14 +263,26 @@ BSONObj createCommandForMergingShard(
return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), true);
}
-std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- CachedCollectionRoutingInfo* routingInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
+/**
+ * Verifies that the shardIds are the same as they were atClusterTime using versioned table.
+ * TODO: SERVER-33767
+ */
+bool verifyTargetedShardsAtClusterTime(OperationContext* opCtx,
+ const std::set<ShardId>& shardIds,
+ LogicalTime atClusterTime) {
+ return true;
+}
+
+std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ CachedCollectionRoutingInfo* routingInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery,
+ const BSONObj& collation) {
+>>>>>>> parent of 7d09f27... SERVER-33323 Use the IDL to serialize the ARM
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
bool mustRunOnAll = mustRunOnAllShards(nss, *routingInfo, litePipe);
@@ -341,7 +352,7 @@ struct DispatchShardPipelineResults {
// Populated if this *is not* an explain, this vector represents the cursors on the remote
// shards.
- std::vector<RemoteCursor> remoteCursors;
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
// Populated if this *is* an explain, this vector represents the results from each shard.
std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
@@ -379,7 +390,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// pipeline is already split and we now only need to target a single shard, reassemble the
// original pipeline.
// - After exhausting 10 attempts to establish the cursors, we give up and throw.
- auto cursors = std::vector<RemoteCursor>();
+ auto cursors = std::vector<ClusterClientCursorParams::RemoteCursor>();
auto shardResults = std::vector<AsyncRequestsSender::Response>();
auto opCtx = expCtx->opCtx;
@@ -537,7 +548,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
BSONObj cmdToRunOnNewShards,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
- std::vector<RemoteCursor> cursors) {
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
@@ -545,6 +556,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
params.tailableMode = pipelineForMerging->getContext()->tailableMode;
params.mergePipeline = std::move(pipelineForMerging);
params.remotes = std::move(cursors);
+
// A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
// size we pass here is used for getMores, so do not specify a batch size if the initial request
// had a batch size of 0.
@@ -554,19 +566,12 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
if (liteParsedPipeline.hasChangeStream()) {
// For change streams, we need to set up a custom stage to establish cursors on new shards
- // when they are added. Be careful to extract the targeted shard IDs before the remote
- // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger.
- std::vector<ShardId> shardIds;
- for (const auto& remote : params.remotes) {
- shardIds.emplace_back(remote.getShardId().toString());
- }
-
- params.createCustomCursorSource = [cmdToRunOnNewShards,
- shardIds](OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
+ // when they are added.
+ params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
return stdx::make_unique<RouterStageUpdateOnAddShard>(
- opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards);
+ opCtx, executor, params, cmdToRunOnNewShards);
};
}
auto ccc = ClusterClientCursorImpl::make(
@@ -615,7 +620,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
ccc->detachFromOperationContext();
- int nShards = ccc->getNumRemotes();
+ int nShards = ccc->getRemotes().size();
CursorId clusterCursorId = 0;
if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
@@ -681,8 +686,7 @@ ShardId pickMergingShard(OperationContext* opCtx,
return dispatchResults.needsPrimaryShardMerge
? primaryShard
: dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())]
- .getShardId()
- .toString();
+ .shardId;
}
} // namespace
@@ -835,16 +839,15 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto executorPool = Grid::get(opCtx)->getExecutorPool();
const BSONObj reply = uassertStatusOK(storePossibleCursor(
opCtx,
- remoteCursor.getShardId().toString(),
- remoteCursor.getHostAndPort(),
- remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse),
+ remoteCursor.shardId,
+ remoteCursor.hostAndPort,
+ remoteCursor.cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse),
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
mergeCtx->tailableMode));
- return appendCursorResponseToCommandResult(
- remoteCursor.getShardId().toString(), reply, result);
+ return appendCursorResponseToCommandResult(remoteCursor.shardId, reply, result);
}
// If we reach here, we have a merge pipeline to dispatch.
@@ -880,10 +883,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
ShardId mergingShardId =
pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId());
- cluster_aggregation_planner::addMergeCursorsSource(
- mergingPipeline.get(),
+ mergingPipeline->addInitialSource(DocumentSourceMergeCursors::create(
std::move(dispatchResults.remoteCursors),
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ mergeCtx));
auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);
auto mergeResponse =
@@ -978,8 +981,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
namespaces.requestedNss,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
- liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData
- : TailableModeEnum::kNormal));
+ liteParsedPipeline.hasChangeStream() ? TailableMode::kTailableAndAwaitData
+ : TailableMode::kNormal));
}
// First append the properly constructed writeConcernError. It will then be skipped
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index 4d91ddfff08..060c5533877 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -114,7 +114,7 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern);
}
- auto shardResult = std::vector<RemoteCursor>();
+ auto shardResult = std::vector<ClusterClientCursorParams::RemoteCursor>();
auto findCmd = cmdBuilder.obj();
size_t numAttempts = 0;
while (++numAttempts <= kMaxNumStaleVersionRetries) {
@@ -164,13 +164,13 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
invariant(shardResult.size() == 1u);
- auto& cursor = shardResult.front().getCursorResponse();
+ auto& cursor = shardResult.front().cursorResponse;
auto& batch = cursor.getBatch();
// We should have at most 1 result, and the cursor should be exhausted.
uassert(ErrorCodes::InternalError,
str::stream() << "Shard cursor was unexpectedly open after lookup: "
- << shardResult.front().getHostAndPort()
+ << shardResult.front().hostAndPort
<< ", id: "
<< cursor.getCursorId(),
cursor.getCursorId() == 0);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index fa70221c4fd..708f52ac576 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -585,7 +585,7 @@ DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss
}
uassertStatusOK(statusGetDb);
- boost::optional<std::int64_t> batchSize;
+ boost::optional<long long> batchSize;
if (ntoreturn) {
batchSize = ntoreturn;
}
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index aa9a1713255..d9148577f5a 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -86,7 +86,6 @@ env.Library(
source=[
"async_results_merger.cpp",
"establish_cursors.cpp",
- env.Idlc('async_results_merger_params.idl')[0],
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/query/command_request_response",
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 45403399e8c..59d9a133d72 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -82,27 +82,20 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa
AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
executor::TaskExecutor* executor,
- AsyncResultsMergerParams params)
+ ClusterClientCursorParams* params)
: _opCtx(opCtx),
_executor(executor),
- // This strange initialization is to work around the fact that the IDL does not currently
- // support a default value for an enum. The default tailable mode should be 'kNormal', but
- // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
- _tailableMode(params.getTailableMode() ? *params.getTailableMode()
- : TailableModeEnum::kNormal),
- _params(std::move(params)),
- _mergeQueue(MergingComparator(_remotes,
- _params.getSort() ? *_params.getSort() : BSONObj(),
- _params.getCompareWholeSortKey())) {
+ _params(params),
+ _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) {
size_t remoteIndex = 0;
- for (const auto& remote : _params.getRemotes()) {
- _remotes.emplace_back(remote.getHostAndPort(),
- remote.getCursorResponse().getNSS(),
- remote.getCursorResponse().getCursorId());
+ for (const auto& remote : _params->remotes) {
+ _remotes.emplace_back(remote.hostAndPort,
+ remote.cursorResponse.getNSS(),
+ remote.cursorResponse.getCursorId());
// We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
- _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse());
+ _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse);
++remoteIndex;
}
}
@@ -130,7 +123,7 @@ bool AsyncResultsMerger::_remotesExhausted(WithLock) {
Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_tailableMode != TailableModeEnum::kTailableAndAwaitData) {
+ if (_params->tailableMode != TailableMode::kTailableAndAwaitData) {
return Status(ErrorCodes::BadValue,
"maxTimeMS can only be used with getMore for tailable, awaitData cursors");
}
@@ -140,9 +133,9 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
// recent optimes, which allows us to return sorted $changeStream results even if some shards
// are yet to provide a batch of data. If the timeout specified by the client is greater than
// 1000ms, then it will be enforced elsewhere.
- _awaitDataTimeout =
- (_params.getSort() && _remotes.size() > 1u ? std::min(awaitDataTimeout, Milliseconds{1000})
- : awaitDataTimeout);
+ _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u
+ ? std::min(awaitDataTimeout, Milliseconds{1000})
+ : awaitDataTimeout);
return Status::OK();
}
@@ -168,12 +161,13 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
_opCtx = opCtx;
}
-void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
+void AsyncResultsMerger::addNewShardCursors(
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
for (auto&& remote : newCursors) {
- _remotes.emplace_back(remote.getHostAndPort(),
- remote.getCursorResponse().getNSS(),
- remote.getCursorResponse().getCursorId());
+ _remotes.emplace_back(remote.hostAndPort,
+ remote.cursorResponse.getNSS(),
+ remote.cursorResponse.getCursorId());
}
}
@@ -196,15 +190,16 @@ bool AsyncResultsMerger::_ready(WithLock lk) {
}
}
- return _params.getSort() ? _readySorted(lk) : _readyUnsorted(lk);
+ const bool hasSort = !_params->sort.isEmpty();
+ return hasSort ? _readySorted(lk) : _readyUnsorted(lk);
}
bool AsyncResultsMerger::_readySorted(WithLock lk) {
- if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (_params->tailableMode == TailableMode::kTailableAndAwaitData) {
return _readySortedTailable(lk);
}
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_tailableMode == TailableModeEnum::kNormal);
+ invariant(_params->tailableMode == TailableMode::kNormal);
for (const auto& remote : _remotes) {
if (!remote.hasNext() && !remote.exhausted()) {
@@ -223,14 +218,13 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) {
auto smallestRemote = _mergeQueue.top();
auto smallestResult = _remotes[smallestRemote].docBuffer.front();
auto keyWeWantToReturn =
- extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey());
+ extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey);
for (const auto& remote : _remotes) {
if (!remote.promisedMinSortKey) {
// In order to merge sorted tailable cursors, we need this value to be populated.
return false;
}
- if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) >
- 0) {
+ if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) {
// The key we want to return is not guaranteed to be smaller than future results from
// this remote, so we can't yet return it.
return false;
@@ -270,12 +264,13 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
return {ClusterQueryResult()};
}
- return _params.getSort() ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
+ const bool hasSort = !_params->sort.isEmpty();
+ return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
}
ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_tailableMode != TailableModeEnum::kTailable);
+ invariant(_params->tailableMode != TailableMode::kTailable);
if (_mergeQueue.empty()) {
return {};
@@ -309,7 +304,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front();
_remotes[_gettingFromRemote].docBuffer.pop();
- if (_tailableMode == TailableModeEnum::kTailable &&
+ if (_params->tailableMode == TailableMode::kTailable &&
!_remotes[_gettingFromRemote].hasNext()) {
// The cursor is tailable and we're about to return the last buffered result. This
// means that the next value returned should be boost::none to indicate the end of
@@ -339,9 +334,9 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
// request to fetch the remaining docs only. If the remote node has a plan with OR for top k and
// a full sort as is the case for the OP_QUERY find then this optimization will prevent
// switching to the full sort plan branch.
- auto adjustedBatchSize = _params.getBatchSize();
- if (_params.getBatchSize() && *_params.getBatchSize() > remote.fetchedCount) {
- adjustedBatchSize = *_params.getBatchSize() - remote.fetchedCount;
+ auto adjustedBatchSize = _params->batchSize;
+ if (_params->batchSize && *_params->batchSize > remote.fetchedCount) {
+ adjustedBatchSize = *_params->batchSize - remote.fetchedCount;
}
BSONObj cmdObj = GetMoreRequest(remote.cursorNss,
@@ -353,7 +348,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
.toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx);
+ remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _opCtx);
auto callbackStatus =
_executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) {
@@ -453,8 +448,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
remote->cursorId = response.getCursorId();
if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) {
// We only expect to see this for change streams.
- invariant(_params.getSort());
- invariant(SimpleBSONObjComparator::kInstance.evaluate(*_params.getSort() ==
+ invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort ==
change_stream_constants::kSortSpec));
auto newLatestTimestamp = *response.getLastOplogTimestamp();
@@ -481,7 +475,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
auto maxSortKeyFromResponse =
(response.getBatch().empty()
? BSONObj()
- : extractSortKey(response.getBatch().back(), _params.getCompareWholeSortKey()));
+ : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey));
remote->promisedMinSortKey =
(compareSortKeys(
@@ -531,7 +525,7 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t
remote.status = std::move(status);
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
// remove the unreachable host entirely from consideration by marking it as exhausted.
- if (_params.getAllowPartialResults()) {
+ if (_params->isAllowPartialResults) {
remote.status = Status::OK();
// Clear the results buffer and cursor id.
@@ -571,7 +565,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
// through to the client as-is.
// (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from
// one shard means the end of the overall batch).
- if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) {
+ if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) {
invariant(_remotes.size() == 1);
_eofNext = true;
} else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) {
@@ -588,7 +582,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
updateRemoteMetadata(&remote, response);
for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
- if (_params.getSort()) {
+ if (!_params->sort.isEmpty()) {
auto key = obj[AsyncResultsMerger::kSortKeyField];
if (!key) {
remote.status =
@@ -597,7 +591,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
<< "' in document: "
<< obj);
return false;
- } else if (!_params.getCompareWholeSortKey() && key.type() != BSONType::Object) {
+ } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) {
remote.status =
Status(ErrorCodes::InternalError,
str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField
@@ -612,9 +606,9 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
++remote.fetchedCount;
}
- // If we're doing a sorted merge, then we have to make sure to put this remote onto the merge
- // queue.
- if (_params.getSort() && !response.getBatch().empty()) {
+ // If we're doing a sorted merge, then we have to make sure to put this remote onto the
+ // merge queue.
+ if (!_params->sort.isEmpty() && !response.getBatch().empty()) {
_mergeQueue.push(remoteIndex);
}
return true;
@@ -644,10 +638,10 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx)
for (const auto& remote : _remotes) {
if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) {
- BSONObj cmdObj = KillCursorsRequest(_params.getNss(), {remote.cursorId}).toBSON();
+ BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, opCtx);
+ remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx);
// Send kill request; discard callback handle, if any, or failure report, if not.
_executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus().ignore();
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 1653374b5bc..e2551a3c7dd 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -37,7 +37,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/async_results_merger_params_gen.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_query_result.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -98,7 +98,7 @@ public:
*/
AsyncResultsMerger(OperationContext* opCtx,
executor::TaskExecutor* executor,
- AsyncResultsMergerParams params);
+ ClusterClientCursorParams* params);
/**
* In order to be destroyed, either the ARM must have been kill()'ed or all cursors must have
@@ -195,11 +195,7 @@ public:
* Adds the specified shard cursors to the set of cursors to be merged. The results from the
* new cursors will be returned as normal through nextReady().
*/
- void addNewShardCursors(std::vector<RemoteCursor>&& newCursors);
-
- std::size_t getNumRemotes() const {
- return _remotes.size();
- }
+ void addNewShardCursors(const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors);
/**
* Starts shutting down this ARM by canceling all pending requests and scheduling killCursors
@@ -297,7 +293,7 @@ private:
private:
const std::vector<RemoteCursorData>& _remotes;
- const BSONObj _sort;
+ const BSONObj& _sort;
// When '_compareWholeSortKey' is true, $sortKey is a scalar value, rather than an object.
// We extract the sort key {$sortKey: <value>}. The sort key pattern '_sort' is verified to
@@ -405,8 +401,7 @@ private:
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
- TailableModeEnum _tailableMode;
- AsyncResultsMergerParams _params;
+ ClusterClientCursorParams* _params;
// Must be acquired before accessing any data members (other than _params, which is read-only).
stdx::mutex _mutex;
diff --git a/src/mongo/s/query/async_results_merger_params.idl b/src/mongo/s/query/async_results_merger_params.idl
deleted file mode 100644
index dafc9b53c1c..00000000000
--- a/src/mongo/s/query/async_results_merger_params.idl
+++ /dev/null
@@ -1,89 +0,0 @@
-# Copyright (C) 2018 MongoDB Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License, version 3,
-# as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the GNU Affero General Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-
-global:
- cpp_namespace: "mongo"
- cpp_includes:
- - "mongo/s/shard_id.h"
- - "mongo/util/net/hostandport.h"
- - "mongo/db/query/cursor_response.h"
-
-imports:
- - "mongo/db/query/tailable_mode.idl"
- - "mongo/idl/basic_types.idl"
- - "mongo/util/net/hostandport.idl"
-
-types:
- CursorResponse:
- bson_serialization_type: object
- description: The first batch returned after establishing cursors on a shard.
- cpp_type: CursorResponse
- serializer: CursorResponse::toBSONAsInitialResponse
- deserializer: CursorResponse::parseFromBSONThrowing
-
-structs:
- RemoteCursor:
- description: A description of a cursor opened on a remote server.
- fields:
- shardId:
- type: string
- description: The shardId of the shard on which the cursor resides.
- hostAndPort:
- type: HostAndPort
- description: The exact host (within the shard) on which the cursor resides.
- cursorResponse:
- type: CursorResponse
- description: The response after establishing a cursor on the remote shard, including
- the first batch.
-
- AsyncResultsMergerParams:
- description: The parameters needed to establish an AsyncResultsMerger.
- fields:
- sort:
- type: object
- description: The sort requested on the merging operation. Empty if there is no sort.
- optional: true
- compareWholeSortKey:
- type: bool
- default: false
- description: >-
- When 'compareWholeSortKey' is true, $sortKey is a scalar value, rather than an
- object. We extract the sort key {$sortKey: <value>}. The sort key pattern is
- verified to be {$sortKey: 1}.
- remotes: array<RemoteCursor>
- tailableMode:
- type: TailableMode
- optional: true
- description: If set, the tailability mode of this cursor.
- batchSize:
- type: safeInt64
- optional: true
- description: The batch size for this cursor.
- nss: namespacestring
- allowPartialResults:
- type: bool
- default: false
- description: If set, error responses are ignored.
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 6fd81715e90..4c0e32cba51 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -62,11 +62,9 @@ const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host",
HostAndPort("FakeShard2Host", 12345),
HostAndPort("FakeShard3Host", 12345)};
-const NamespaceString kTestNss("testdb.testcoll");
-
class AsyncResultsMergerTest : public ShardingTestFixture {
public:
- AsyncResultsMergerTest() {}
+ AsyncResultsMergerTest() : _nss("testdb.testcoll") {}
void setUp() override {
ShardingTestFixture::setUp();
@@ -97,48 +95,42 @@ public:
void tearDown() override {
ShardingTestFixture::tearDown();
+ // Reset _params only after shutting down the network interface (through
+ // ShardingTestFixture::tearDown()), because shutting down the network interface will
+ // deliver blackholed responses to the AsyncResultsMerger, and the AsyncResultsMerger's
+ // callback may still access _params.
+ _params.reset();
}
protected:
/**
* Constructs an ARM with the given vector of existing cursors.
*
- * If 'findCmd' is not set, the default AsyncResultsMergerParams are used.
- * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams.
+ * If 'findCmd' is not set, the default ClusterClientCursorParams are used.
+ * Otherwise, the 'findCmd' is used to construct the ClusterClientCursorParams.
*
* 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
* initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
*/
- std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors(
- std::vector<RemoteCursor> remoteCursors,
+ void makeCursorFromExistingCursors(
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors,
boost::optional<BSONObj> findCmd = boost::none,
- boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- params.setRemotes(std::move(remoteCursors));
-
+ boost::optional<long long> getMoreBatchSize = boost::none) {
+ _params = stdx::make_unique<ClusterClientCursorParams>(_nss);
+ _params->remotes = std::move(remoteCursors);
if (findCmd) {
const auto qr = unittest::assertGet(
- QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */));
- if (!qr->getSort().isEmpty()) {
- params.setSort(qr->getSort().getOwned());
- }
-
- if (getMoreBatchSize) {
- params.setBatchSize(getMoreBatchSize);
- } else {
- params.setBatchSize(qr->getBatchSize()
- ? boost::optional<std::int64_t>(
- static_cast<std::int64_t>(*qr->getBatchSize()))
- : boost::none);
- }
- params.setTailableMode(qr->getTailableMode());
- params.setAllowPartialResults(qr->isAllowPartialResults());
+ QueryRequest::makeFromFindCommand(_nss, *findCmd, false /* isExplain */));
+ _params->sort = qr->getSort();
+ _params->limit = qr->getLimit();
+ _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize();
+ _params->skip = qr->getSkip();
+ _params->tailableMode = qr->getTailableMode();
+ _params->isAllowPartialResults = qr->isAllowPartialResults();
}
- return stdx::make_unique<AsyncResultsMerger>(
- operationContext(), executor(), std::move(params));
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), _params.get());
}
/**
@@ -228,6 +220,11 @@ protected:
net->blackHole(net->getNextReadyRequest());
net->exitNetwork();
}
+
+ const NamespaceString _nss;
+ std::unique_ptr<ClusterClientCursorParams> _params;
+
+ std::unique_ptr<AsyncResultsMerger> arm;
};
void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
@@ -243,19 +240,10 @@ void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
ASSERT_EQ(numCursors, 1u);
}
-RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
- RemoteCursor remoteCursor;
- remoteCursor.setShardId(std::move(shardId));
- remoteCursor.setHostAndPort(std::move(host));
- remoteCursor.setCursorResponse(std::move(response));
- return remoteCursor;
-}
-
TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -271,7 +259,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -297,10 +285,9 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -316,7 +303,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -341,12 +328,10 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
}
TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -363,7 +348,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -391,7 +376,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
responses.clear();
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -407,20 +392,18 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both shards
- // cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -437,7 +420,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"),
fromjson("{$sortKey: {'': 6}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -451,7 +434,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"),
fromjson("{$sortKey: {'': 9}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -473,19 +456,17 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 9}}"),
*unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both shards
- // cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -498,7 +479,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(5), batch1);
+ responses.emplace_back(_nss, CursorId(5), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -527,7 +508,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
responses.clear();
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -555,7 +536,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
responses.clear();
std::vector<BSONObj> batch3 = {
fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -571,22 +552,19 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both shards
- // cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 7, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 7, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
// Schedule requests.
ASSERT_FALSE(arm->ready());
@@ -597,13 +575,13 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5, '': 9}}"),
fromjson("{$sortKey: {'': 4, '': 20}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 10, '': 11}}"),
fromjson("{$sortKey: {'': 4, '': 4}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"),
fromjson("{$sortKey: {'': 5, '': 9}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -629,18 +607,17 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"),
*unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both shards
- // cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -649,7 +626,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
// Parsing the batch results in an error because the sort key is missing.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -667,10 +644,10 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
std::vector<BSONObj> firstBatch = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- std::vector<RemoteCursor> cursors;
- cursors.push_back(makeRemoteCursor(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch))));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch)));
+ makeCursorFromExistingCursors(std::move(cursors));
// Because there was firstBatch, ARM is immediately ready to return results.
ASSERT_TRUE(arm->ready());
@@ -698,7 +675,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -725,12 +702,11 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
std::vector<BSONObj> firstBatch = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- std::vector<RemoteCursor> cursors;
- cursors.push_back(makeRemoteCursor(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch))));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 0, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch)));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 0, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Because there was firstBatch, ARM is immediately ready to return results.
ASSERT_TRUE(arm->ready());
@@ -758,7 +734,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -783,12 +759,10 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
}
TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -797,9 +771,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// Both shards respond with the first batch.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(2), batch2);
+ responses.emplace_back(_nss, CursorId(2), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -821,7 +795,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// never responds.
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(1), batch3);
+ responses.emplace_back(_nss, CursorId(1), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
blackHoleNextRequest();
@@ -839,7 +813,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// We can continue to return results from first shard, while second shard remains unresponsive.
responses.clear();
std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -854,15 +828,12 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// the network interface.
auto killEvent = arm->kill(operationContext());
ASSERT_TRUE(killEvent.isValid());
- executor()->shutdown();
- executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -870,7 +841,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(456), batch);
+ responses.emplace_back(_nss, CursorId(456), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -884,25 +855,22 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
}
TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 789, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 789, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- BSONObj response1 = CursorResponse(kTestNss, CursorId(123), batch1)
+ BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1)
.toBSON(CursorResponse::ResponseType::SubsequentResponse);
BSONObj response2 = fromjson("{foo: 'bar'}");
std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")};
- BSONObj response3 = CursorResponse(kTestNss, CursorId(789), batch3)
+ BSONObj response3 = CursorResponse(_nss, CursorId(789), batch3)
.toBSON(CursorResponse::ResponseType::SubsequentResponse);
scheduleNetworkResponseObjs({response1, response2, response3});
runReadyCallbacks();
@@ -917,14 +885,11 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
}
TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -932,9 +897,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(2), batch2);
+ responses.emplace_back(_nss, CursorId(2), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -953,10 +918,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
}
TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -966,7 +930,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -984,10 +948,9 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
}
TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
executor()->shutdown();
ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus());
@@ -996,10 +959,9 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
}
TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Make a request to the shard that will never get answered.
ASSERT_FALSE(arm->ready());
@@ -1017,10 +979,9 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch
}
TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto killedEvent = arm->kill(operationContext());
@@ -1035,14 +996,11 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
}
TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1050,11 +1008,11 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1069,14 +1027,11 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
}
TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1084,12 +1039,12 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
// Cursor 3 is not exhausted.
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(123), batch3);
+ responses.emplace_back(_nss, CursorId(123), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1104,14 +1059,11 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
}
TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1119,7 +1071,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1139,10 +1091,9 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
}
TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1150,7 +1101,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1163,10 +1114,9 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
}
TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
auto killedEvent1 = arm->kill(operationContext());
ASSERT(killedEvent1.isValid());
auto killedEvent2 = arm->kill(operationContext());
@@ -1177,10 +1127,9 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
TEST_F(AsyncResultsMergerTest, TailableBasic) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1188,7 +1137,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(123), batch1);
+ responses.emplace_back(_nss, CursorId(123), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1209,7 +1158,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(123), batch2);
+ responses.emplace_back(_nss, CursorId(123), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1227,10 +1176,9 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1239,7 +1187,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
// Remote responds with an empty batch and a non-zero cursor id.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
- responses.emplace_back(kTestNss, CursorId(123), batch);
+ responses.emplace_back(_nss, CursorId(123), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1256,10 +1204,9 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1268,7 +1215,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
// Remote responds with an empty batch and a zero cursor id.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1282,10 +1229,9 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1293,7 +1239,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1308,7 +1254,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
readyEvent = unittest::assertGet(arm->nextEvent());
BSONObj scheduledCmd = getNthPendingRequest(0).cmdObj;
@@ -1328,14 +1274,11 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 97, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 98, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 99, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 97, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 98, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 99, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1349,9 +1292,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
// remaining shards.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
- responses.emplace_back(kTestNss, CursorId(98), batch1);
+ responses.emplace_back(_nss, CursorId(98), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(99), batch2);
+ responses.emplace_back(_nss, CursorId(99), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1372,7 +1315,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(99), batch3);
+ responses.emplace_back(_nss, CursorId(99), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1387,7 +1330,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
// Once the last reachable shard indicates that its cursor is closed, we're done.
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1398,10 +1341,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 98, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 98, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1409,7 +1351,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(98), batch);
+ responses.emplace_back(_nss, CursorId(98), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1432,12 +1374,10 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1446,7 +1386,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
// First host returns single result
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1464,12 +1404,10 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1494,10 +1432,9 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1505,7 +1442,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
- responses.emplace_back(kTestNss, CursorId(123), batch1);
+ responses.emplace_back(_nss, CursorId(123), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1527,7 +1464,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(123), batch2);
+ responses.emplace_back(_nss, CursorId(123), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1548,24 +1485,20 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
// Clean up.
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1577,7 +1510,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1590,7 +1523,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, "
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1610,40 +1543,37 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
// Clean up the cursors.
responses.clear();
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest,
SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNullOplogTimestamp) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.push_back(makeRemoteCursor(
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
kTestShardIds[0],
kTestShardHosts[0],
CursorResponse(
- kTestNss,
+ _nss,
123,
{fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")},
boost::none,
- Timestamp(1, 5))));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1],
+ Timestamp(1, 5)));
+ cursors.emplace_back(kTestShardIds[1],
kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, Timestamp())));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ CursorResponse(_nss, 456, {}, boost::none, Timestamp()));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1651,7 +1581,7 @@ TEST_F(AsyncResultsMergerTest,
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3, boost::none, Timestamp(1, 8));
+ responses.emplace_back(_nss, CursorId(0), batch3, boost::none, Timestamp(1, 8));
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(unittest::assertGet(arm->nextEvent()));
@@ -1667,34 +1597,31 @@ TEST_F(AsyncResultsMergerTest,
// Clean up.
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
Timestamp tooLow = Timestamp(1, 2);
- cursors.push_back(makeRemoteCursor(
+ cursors.emplace_back(
kTestShardIds[0],
kTestShardHosts[0],
CursorResponse(
- kTestNss,
+ _nss,
123,
{fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")},
boost::none,
- Timestamp(1, 5))));
- cursors.push_back(makeRemoteCursor(kTestShardIds[1],
- kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, tooLow)));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ Timestamp(1, 5)));
+ cursors.emplace_back(
+ kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}, boost::none, tooLow));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1702,7 +1629,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp
// Clean up the cursors.
std::vector<CursorResponse> responses;
- responses.emplace_back(kTestNss, CursorId(0), std::vector<BSONObj>{});
+ responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>{});
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
auto killEvent = arm->kill(operationContext());
@@ -1710,16 +1637,13 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1731,7 +1655,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1739,10 +1663,9 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
ASSERT_TRUE(arm->ready());
// Add the new shard.
- std::vector<RemoteCursor> newCursors;
- newCursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
- arm->addNewShardCursors(std::move(newCursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> newCursors;
+ newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ arm->addNewShardCursors(newCursors);
// Now shouldn't be ready, we don't have a guarantee from each shard.
ASSERT_FALSE(arm->ready());
@@ -1754,7 +1677,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, "
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1774,27 +1697,24 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
// Clean up the cursors.
responses.clear();
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1806,7 +1726,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1814,10 +1734,9 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
ASSERT_TRUE(arm->ready());
// Add the new shard.
- std::vector<RemoteCursor> newCursors;
- newCursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
- arm->addNewShardCursors(std::move(newCursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> newCursors;
+ newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ arm->addNewShardCursors(newCursors);
// Now shouldn't be ready, we don't have a guarantee from each shard.
ASSERT_FALSE(arm->ready());
@@ -1831,7 +1750,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
// The last observed time should still be later than the first shard, so we can get the data
// from it.
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1851,22 +1770,21 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
// Clean up the cursors.
responses.clear();
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill(operationContext());
@@ -1875,10 +1793,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill(operationContext());
@@ -1887,10 +1804,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1904,10 +1820,9 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
}
TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulingKillCursors) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -1935,10 +1850,9 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin
}
TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -1957,7 +1871,7 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
// exhausted.
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["getMore"]);
- return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ return CursorResponse(_nss, 0LL, {BSON("x" << 1)})
.toBSON(CursorResponse::ResponseType::SubsequentResponse);
});
@@ -1965,10 +1879,9 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
}
TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
@@ -1992,10 +1905,9 @@ TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
}
TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 9c01c013ce6..653599def7e 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -113,7 +113,7 @@ public:
/**
* Returns a reference to the vector of remote hosts involved in this operation.
*/
- virtual const std::size_t getNumRemotes() const = 0;
+ virtual const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const = 0;
/**
* Returns the number of result documents returned so far by this cursor via the next() method.
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 1a4be45f1be..58484e87bfa 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -30,7 +30,6 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_source_sort.h"
@@ -136,19 +135,20 @@ OperationContext* ClusterClientCursorImpl::getCurrentOperationContext() const {
}
bool ClusterClientCursorImpl::isTailable() const {
- return _params.tailableMode != TailableModeEnum::kNormal;
+ return _params.tailableMode != TailableMode::kNormal;
}
bool ClusterClientCursorImpl::isTailableAndAwaitData() const {
- return _params.tailableMode == TailableModeEnum::kTailableAndAwaitData;
+ return _params.tailableMode == TailableMode::kTailableAndAwaitData;
}
BSONObj ClusterClientCursorImpl::getOriginatingCommand() const {
return _params.originatingCommandObj;
}
-const std::size_t ClusterClientCursorImpl::getNumRemotes() const {
- return _root->getNumRemotes();
+const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorImpl::getRemotes()
+ const {
+ return _params.remotes;
}
long long ClusterClientCursorImpl::getNumReturnedSoFar() const {
@@ -181,6 +181,30 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
namespace {
+/**
+ * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the
+ * sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
+ */
+boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) {
+ // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort.
+ auto frontSort = mergePipeline->popFrontWithCriteria(
+ DocumentSourceSort::kStageName, [](const DocumentSource* const source) {
+ return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted();
+ });
+
+ if (frontSort) {
+ auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
+ if (auto sortLimit = sortStage->getLimitSrc()) {
+ // There was a limit stage absorbed into the sort stage, so we need to preserve that.
+ mergePipeline->addInitialSource(sortLimit);
+ }
+ return sortStage
+ ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
+ .toBson();
+ }
+ return boost::none;
+}
+
bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
dynamic_cast<DocumentSourceSkip*>(stage.get()));
@@ -226,10 +250,10 @@ std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* execu
// instead.
while (!pipeline->getSources().empty()) {
invariant(isSkipOrLimit(pipeline->getSources().front()));
- if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) {
+ if (auto skip = pipeline->popFrontWithCriteria(DocumentSourceSkip::kStageName)) {
root = stdx::make_unique<RouterStageSkip>(
opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
- } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) {
+ } else if (auto limit = pipeline->popFrontWithCriteria(DocumentSourceLimit::kStageName)) {
root = stdx::make_unique<RouterStageLimit>(
opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
}
@@ -246,8 +270,7 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
const auto skip = params->skip;
const auto limit = params->limit;
if (params->mergePipeline) {
- if (auto sort =
- cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) {
+ if (auto sort = extractLeadingSort(params->mergePipeline.get())) {
params->sort = *sort;
}
return buildPipelinePlan(executor, params);
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index d3c9349233b..c685a383307 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -105,7 +105,7 @@ public:
BSONObj getOriginatingCommand() const final;
- const std::size_t getNumRemotes() const final;
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final;
long long getNumReturnedSoFar() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 6e624f36b84..0e3a3fd6731 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -68,7 +68,8 @@ BSONObj ClusterClientCursorMock::getOriginatingCommand() const {
return _originatingCommand;
}
-const std::size_t ClusterClientCursorMock::getNumRemotes() const {
+const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorMock::getRemotes()
+ const {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 1c50403c3ae..7364240112d 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -69,7 +69,7 @@ public:
BSONObj getOriginatingCommand() const final;
- const std::size_t getNumRemotes() const final;
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final;
long long getNumReturnedSoFar() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 71a7f17c282..116634bcee8 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -42,7 +42,6 @@
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/tailable_mode.h"
#include "mongo/s/client/shard.h"
-#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -62,6 +61,22 @@ class RouterExecStage;
* this cursor have been processed.
*/
struct ClusterClientCursorParams {
+ struct RemoteCursor {
+ RemoteCursor(ShardId shardId, HostAndPort hostAndPort, CursorResponse cursorResponse)
+ : shardId(std::move(shardId)),
+ hostAndPort(std::move(hostAndPort)),
+ cursorResponse(std::move(cursorResponse)) {}
+
+ // The shardId of the shard on which the cursor resides.
+ ShardId shardId;
+
+ // The exact host (within the shard) on which the cursor resides.
+ HostAndPort hostAndPort;
+
+ // Encompasses the state of the established cursor.
+ CursorResponse cursorResponse;
+ };
+
ClusterClientCursorParams(NamespaceString nss,
boost::optional<ReadPreferenceSetting> readPref = boost::none)
: nsString(std::move(nss)) {
@@ -70,24 +85,6 @@ struct ClusterClientCursorParams {
}
}
- /**
- * Extracts the subset of fields here needed by the AsyncResultsMerger. The returned
- * AsyncResultsMergerParams will assume ownership of 'remotes'.
- */
- AsyncResultsMergerParams extractARMParams() {
- AsyncResultsMergerParams armParams;
- if (!sort.isEmpty()) {
- armParams.setSort(sort);
- }
- armParams.setCompareWholeSortKey(compareWholeSortKey);
- armParams.setRemotes(std::move(remotes));
- armParams.setTailableMode(tailableMode);
- armParams.setBatchSize(batchSize);
- armParams.setNss(nsString);
- armParams.setAllowPartialResults(isAllowPartialResults);
- return armParams;
- }
-
// Namespace against which the cursors exist.
NamespaceString nsString;
@@ -111,7 +108,7 @@ struct ClusterClientCursorParams {
// The number of results per batch. Optional. If specified, will be specified as the batch for
// each getMore.
- boost::optional<std::int64_t> batchSize;
+ boost::optional<long long> batchSize;
// Limits the number of results returned by the ClusterClientCursor to this many. Optional.
// Should be forwarded to the remote hosts in 'cmdObj'.
@@ -122,7 +119,7 @@ struct ClusterClientCursorParams {
// Whether this cursor is tailing a capped collection, and whether it has the awaitData option
// set.
- TailableModeEnum tailableMode = TailableModeEnum::kNormal;
+ TailableMode tailableMode = TailableMode::kNormal;
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index ca66ee04e08..cd5ce0f9bc8 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -147,9 +147,10 @@ BSONObj ClusterCursorManager::PinnedCursor::getOriginatingCommand() const {
return _cursor->getOriginatingCommand();
}
-const std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const {
+const std::vector<ClusterClientCursorParams::RemoteCursor>&
+ClusterCursorManager::PinnedCursor::getRemotes() const {
invariant(_cursor);
- return _cursor->getNumRemotes();
+ return _cursor->getRemotes();
}
CursorId ClusterCursorManager::PinnedCursor::getCursorId() const {
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index cf6dc54f21e..b9b4ee81ef6 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -198,7 +198,7 @@ public:
/**
* Returns a reference to the vector of remote hosts involved in this operation.
*/
- const std::size_t getNumRemotes() const;
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const;
/**
* Returns the cursor id for the underlying cursor, or zero if no cursor is owned.
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 8c9e654f5bb..f8faaabaa96 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -346,7 +346,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
}
// Fill out query exec properties.
- CurOp::get(opCtx)->debug().nShards = ccc->getNumRemotes();
+ CurOp::get(opCtx)->debug().nShards = ccc->getRemotes().size();
CurOp::get(opCtx)->debug().nreturned = results->size();
// If the cursor is exhausted, then there are no more results to return and we don't need to
@@ -490,7 +490,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
// Set the originatingCommand object and the cursorID in CurOp.
{
- CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getNumRemotes();
+ CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getRemotes().size();
CurOp::get(opCtx)->debug().cursorid = request.cursorid;
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->setOriginatingCommand_inlock(
diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp
index 26a944ed5cc..4e144751dcb 100644
--- a/src/mongo/s/query/document_source_router_adapter.cpp
+++ b/src/mongo/s/query/document_source_router_adapter.cpp
@@ -67,10 +67,6 @@ Value DocumentSourceRouterAdapter::serialize(
return Value(); // Return the empty value to hide this stage from explain output.
}
-std::size_t DocumentSourceRouterAdapter::getNumRemotes() const {
- return _child->getNumRemotes();
-}
-
bool DocumentSourceRouterAdapter::remotesExhausted() {
return _child->remotesExhausted();
}
diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h
index a7db7734539..5c1a6a0935c 100644
--- a/src/mongo/s/query/document_source_router_adapter.h
+++ b/src/mongo/s/query/document_source_router_adapter.h
@@ -59,7 +59,6 @@ public:
void detachFromOperationContext() final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
bool remotesExhausted();
- std::size_t getNumRemotes() const;
void setExecContext(RouterExecStage::ExecContext execContext) {
_execContext = execContext;
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 08ce5a2cb5b..f186bcb1bb5 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -47,12 +47,13 @@
namespace mongo {
-std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults) {
+std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors(
+ OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults) {
// Construct the requests
std::vector<AsyncRequestsSender::Request> requests;
for (const auto& remote : remotes) {
@@ -67,23 +68,20 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
readPref,
Shard::RetryPolicy::kIdempotent);
- std::vector<RemoteCursor> remoteCursors;
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
try {
// Get the responses
while (!ars.done()) {
try {
auto response = ars.next();
- // Note the shardHostAndPort may not be populated if there was an error, so be sure
- // to do this after parsing the cursor response to ensure the response was ok.
- // Additionally, be careful not to push into 'remoteCursors' until we are sure we
- // have a valid cursor, since the error handling path will attempt to clean up
- // anything in 'remoteCursors'
- RemoteCursor cursor;
- cursor.setCursorResponse(CursorResponse::parseFromBSONThrowing(
+
+ // uasserts must happen before attempting to access the optional shardHostAndPort.
+ auto cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(
uassertStatusOK(std::move(response.swResponse)).data));
- cursor.setShardId(std::move(response.shardId));
- cursor.setHostAndPort(*response.shardHostAndPort);
- remoteCursors.push_back(std::move(cursor));
+
+ remoteCursors.emplace_back(std::move(response.shardId),
+ std::move(*response.shardHostAndPort),
+ std::move(cursorResponse));
} catch (const DBException& ex) {
// Retriable errors are swallowed if 'allowPartialResults' is true.
if (allowPartialResults &&
@@ -116,21 +114,18 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
: response.swResponse.getStatus());
if (swCursorResponse.isOK()) {
- RemoteCursor cursor;
- cursor.setShardId(std::move(response.shardId));
- cursor.setHostAndPort(*response.shardHostAndPort);
- cursor.setCursorResponse(std::move(swCursorResponse.getValue()));
- remoteCursors.push_back(std::move(cursor));
+ remoteCursors.emplace_back(std::move(response.shardId),
+ *response.shardHostAndPort,
+ std::move(swCursorResponse.getValue()));
}
}
// Schedule killCursors against all cursors that were established.
for (const auto& remoteCursor : remoteCursors) {
BSONObj cmdObj =
- KillCursorsRequest(nss, {remoteCursor.getCursorResponse().getCursorId()})
- .toBSON();
+ KillCursorsRequest(nss, {remoteCursor.cursorResponse.getCursorId()}).toBSON();
executor::RemoteCommandRequest request(
- remoteCursor.getHostAndPort(), nss.db().toString(), cmdObj, opCtx);
+ remoteCursor.hostAndPort, nss.db().toString(), cmdObj, opCtx);
// We do not process the response to the killCursors request (we make a good-faith
// attempt at cleaning up the cursors, but ignore any returned errors).
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
index e88ddc2682e..b75a750d7b7 100644
--- a/src/mongo/s/query/establish_cursors.h
+++ b/src/mongo/s/query/establish_cursors.h
@@ -36,7 +36,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/async_results_merger_params_gen.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -61,11 +61,12 @@ class CursorResponse;
* on reachable hosts are returned.
*
*/
-std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults);
+std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors(
+ OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults);
} // namespace mongo
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 515da5a358c..418419fdbef 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -89,14 +89,6 @@ public:
}
/**
- * Returns the number of remote hosts involved in this execution plan.
- */
- virtual std::size_t getNumRemotes() const {
- invariant(_child); // The default implementation forwards to the child stage.
- return _child->getNumRemotes();
- }
-
- /**
* Returns whether or not all the remote cursors are exhausted.
*/
virtual bool remotesExhausted() {
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 48abb1452ec..4f17927483b 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -40,21 +40,18 @@ namespace mongo {
RouterStageMerge::RouterStageMerge(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params)
- : RouterExecStage(opCtx),
- _executor(executor),
- _params(params),
- _arm(opCtx, executor, params->extractARMParams()) {}
+ : RouterExecStage(opCtx), _executor(executor), _params(params), _arm(opCtx, executor, params) {}
StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) {
// Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData
// cursors wait for ready() only until a specified time limit is exceeded.
- return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData
+ return (_params->tailableMode == TailableMode::kTailableAndAwaitData
? awaitNextWithTimeout(execCtx)
: _arm.blockingNext());
}
StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) {
- invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData);
// If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not
// ready, we don't block. Fall straight through to the return statement.
while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) {
@@ -88,7 +85,7 @@ StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContex
StatusWith<EventHandle> RouterStageMerge::getNextEvent() {
// If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
if (_leftoverEventFromLastTimeout) {
- invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData);
auto event = _leftoverEventFromLastTimeout;
_leftoverEventFromLastTimeout = EventHandle();
return event;
@@ -105,16 +102,14 @@ bool RouterStageMerge::remotesExhausted() {
return _arm.remotesExhausted();
}
-std::size_t RouterStageMerge::getNumRemotes() const {
- return _arm.getNumRemotes();
-}
-
Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm.setAwaitDataTimeout(awaitDataTimeout);
}
-void RouterStageMerge::addNewShardCursors(std::vector<RemoteCursor>&& newShards) {
- _arm.addNewShardCursors(std::move(newShards));
+void RouterStageMerge::addNewShardCursors(
+ std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards) {
+ _arm.addNewShardCursors(newShards);
+ std::move(newShards.begin(), newShards.end(), std::back_inserter(_params->remotes));
}
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index b6bfee146b6..efd397b8c7e 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -57,12 +57,10 @@ public:
bool remotesExhausted() final;
- std::size_t getNumRemotes() const final;
-
/**
* Adds the cursors in 'newShards' to those being merged by the ARM.
*/
- void addNewShardCursors(std::vector<RemoteCursor>&& newShards);
+ void addNewShardCursors(std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards);
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 5e94274b9ac..97febc62173 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -84,10 +84,6 @@ void RouterStagePipeline::kill(OperationContext* opCtx) {
_mergePipeline->dispose(opCtx);
}
-std::size_t RouterStagePipeline::getNumRemotes() const {
- return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes();
-}
-
bool RouterStagePipeline::remotesExhausted() {
return _mongosOnlyPipeline || _routerAdapter->remotesExhausted();
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index c14ddf9f80b..e876dc816a2 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -51,8 +51,6 @@ public:
bool remotesExhausted() final;
- std::size_t getNumRemotes() const final;
-
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
index 61fa2a9176d..451a1ee9699 100644
--- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
@@ -56,11 +56,9 @@ bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) {
RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params,
- std::vector<ShardId> shardIds,
BSONObj cmdToRunOnNewShards)
: RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)),
_params(params),
- _shardIds(std::move(shardIds)),
_cmdToRunOnNewShards(cmdToRunOnNewShards) {}
StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
@@ -75,12 +73,18 @@ StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
}
void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) {
+ std::vector<ShardId> existingShardIds;
+ for (const auto& remote : _params->remotes) {
+ existingShardIds.push_back(remote.shardId);
+ }
checked_cast<RouterStageMerge*>(getChildStage())
- ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
+ ->addNewShardCursors(
+ establishShardCursorsOnNewShards(std::move(existingShardIds), newShardDetectedObj));
}
-std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(
- const BSONObj& newShardDetectedObj) {
+std::vector<ClusterClientCursorParams::RemoteCursor>
+RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds,
+ const BSONObj& newShardDetectedObj) {
auto* opCtx = getOpCtx();
// Reload the shard registry. We need to ensure a reload initiated after calling this method
// caused the reload, otherwise we aren't guaranteed to get all the new shards.
@@ -94,12 +98,12 @@ std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNe
std::vector<ShardId> shardIds, newShardIds;
shardRegistry->getAllShardIdsNoReload(&shardIds);
- std::sort(_shardIds.begin(), _shardIds.end());
+ std::sort(existingShardIds.begin(), existingShardIds.end());
std::sort(shardIds.begin(), shardIds.end());
std::set_difference(shardIds.begin(),
shardIds.end(),
- _shardIds.begin(),
- _shardIds.end(),
+ existingShardIds.begin(),
+ existingShardIds.end(),
std::back_inserter(newShardIds));
auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
@@ -108,7 +112,6 @@ std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNe
std::vector<std::pair<ShardId, BSONObj>> requests;
for (const auto& shardId : newShardIds) {
requests.emplace_back(shardId, cmdObj);
- _shardIds.push_back(shardId);
}
const bool allowPartialResults = false; // partial results are not allowed
return establishCursors(opCtx,
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h
index 00ee921e2af..1128dc83430 100644
--- a/src/mongo/s/query/router_stage_update_on_add_shard.h
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.h
@@ -44,7 +44,6 @@ public:
RouterStageUpdateOnAddShard(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params,
- std::vector<ShardId> shardIds,
BSONObj cmdToRunOnNewShards);
StatusWith<ClusterQueryResult> next(ExecContext) final;
@@ -59,10 +58,10 @@ private:
/**
* Open the cursors on the new shards.
*/
- std::vector<RemoteCursor> establishShardCursorsOnNewShards(const BSONObj& newShardDetectedObj);
+ std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursorsOnNewShards(
+ std::vector<ShardId> existingShardIds, const BSONObj& newShardDetectedObj);
ClusterClientCursorParams* _params;
- std::vector<ShardId> _shardIds;
BSONObj _cmdToRunOnNewShards;
};
}
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 9a754364412..bdf4283c104 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -38,7 +38,6 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_cursor_manager.h"
-#include "mongo/s/shard_id.h"
namespace mongo {
@@ -49,7 +48,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
ClusterCursorManager* cursorManager,
- TailableModeEnum tailableMode) {
+ TailableMode tailableMode) {
if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) {
return cmdResult;
}
@@ -72,13 +71,11 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
}
ClusterClientCursorParams params(incomingCursorResponse.getValue().getNSS());
- params.remotes.emplace_back();
- auto& remoteCursor = params.remotes.back();
- remoteCursor.setShardId(shardId.toString());
- remoteCursor.setHostAndPort(server);
- remoteCursor.setCursorResponse(CursorResponse(incomingCursorResponse.getValue().getNSS(),
- incomingCursorResponse.getValue().getCursorId(),
- {}));
+ params.remotes.emplace_back(shardId,
+ server,
+ CursorResponse(incomingCursorResponse.getValue().getNSS(),
+ incomingCursorResponse.getValue().getCursorId(),
+ {}));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.tailableMode = tailableMode;
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index b9756be44f7..75a4e76bf24 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -74,6 +74,6 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
ClusterCursorManager* cursorManager,
- TailableModeEnum tailableMode = TailableModeEnum::kNormal);
+ TailableMode tailableMode = TailableMode::kNormal);
} // namespace mongo
diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp
index 27b75dbb06c..b096affd2cd 100644
--- a/src/mongo/shell/bench.cpp
+++ b/src/mongo/shell/bench.cpp
@@ -214,15 +214,12 @@ int runQueryWithReadCommands(DBClientBase* conn,
}
while (cursorResponse.getCursorId() != 0) {
- GetMoreRequest getMoreRequest(
- qr->nss(),
- cursorResponse.getCursorId(),
- qr->getBatchSize()
- ? boost::optional<std::int64_t>(static_cast<std::int64_t>(*qr->getBatchSize()))
- : boost::none,
- boost::none, // maxTimeMS
- boost::none, // term
- boost::none); // lastKnownCommittedOpTime
+ GetMoreRequest getMoreRequest(qr->nss(),
+ cursorResponse.getCursorId(),
+ qr->getBatchSize(),
+ boost::none, // maxTimeMS
+ boost::none, // term
+ boost::none); // lastKnownCommittedOpTime
BSONObj getMoreCommandResult;
uassert(ErrorCodes::CommandFailed,
str::stream() << "getMore command failed; reply was: " << getMoreCommandResult,
diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript
index 133ee826998..b7dd19b3953 100644
--- a/src/mongo/util/net/SConscript
+++ b/src/mongo/util/net/SConscript
@@ -12,7 +12,6 @@ env.Library(
"hostandport.cpp",
"hostname_canonicalization.cpp",
"sockaddr.cpp",
- env.Idlc('hostandport.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/util/net/hostandport.h b/src/mongo/util/net/hostandport.h
index da0af575bc3..540f57f3b46 100644
--- a/src/mongo/util/net/hostandport.h
+++ b/src/mongo/util/net/hostandport.h
@@ -54,19 +54,12 @@ class StatusWith;
*/
struct HostAndPort {
/**
- * Parses "text" to produce a HostAndPort. Returns either that or an error status describing
- * the parse failure.
+ * Parses "text" to produce a HostAndPort. Returns either that or an error
+ * status describing the parse failure.
*/
static StatusWith<HostAndPort> parse(StringData text);
/**
- * A version of 'parse' that throws a UserException if a parsing error is encountered.
- */
- static HostAndPort parseThrowing(StringData text) {
- return uassertStatusOK(parse(text));
- }
-
- /**
* Construct an empty/invalid HostAndPort.
*/
HostAndPort();
diff --git a/src/mongo/util/net/hostandport.idl b/src/mongo/util/net/hostandport.idl
deleted file mode 100644
index 6edda963f6e..00000000000
--- a/src/mongo/util/net/hostandport.idl
+++ /dev/null
@@ -1,38 +0,0 @@
-# Copyright (C) 2018 MongoDB Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License, version 3,
-# as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the GNU Affero General Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-
-global:
- cpp_namespace: "mongo"
- cpp_includes:
- - "mongo/util/net/hostandport.h"
-types:
- HostAndPort:
- bson_serialization_type: string
- description: A string representing a host and port of a mongod or mongos process.
- cpp_type: HostAndPort
- serializer: HostAndPort::toString
- deserializer: HostAndPort::parseThrowing
-