summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/dbclientcursor.cpp2
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp2
-rw-r--r--src/mongo/db/matcher/expression_optimize_test.cpp2
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/cluster_aggregation_planner.cpp182
-rw-r--r--src/mongo/db/pipeline/cluster_aggregation_planner.h58
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp80
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h32
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors_test.cpp208
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h2
-rw-r--r--src/mongo/db/pipeline/expression_context.h4
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp128
-rw-r--r--src/mongo/db/pipeline/pipeline.h38
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp2
-rw-r--r--src/mongo/db/pipeline/pipeline_optimizations.h71
-rw-r--r--src/mongo/db/query/SConscript3
-rw-r--r--src/mongo/db/query/cursor_response.h25
-rw-r--r--src/mongo/db/query/getmore_request.cpp4
-rw-r--r--src/mongo/db/query/getmore_request.h4
-rw-r--r--src/mongo/db/query/query_request.cpp12
-rw-r--r--src/mongo/db/query/query_request.h12
-rw-r--r--src/mongo/db/query/query_request_test.cpp4
-rw-r--r--src/mongo/db/query/tailable_mode.cpp8
-rw-r--r--src/mongo/db/query/tailable_mode.h9
-rw-r--r--src/mongo/db/query/tailable_mode.idl37
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp22
-rw-r--r--src/mongo/db/repl/collection_cloner.h2
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp12
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp6
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp76
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp6
-rw-r--r--src/mongo/s/commands/strategy.cpp2
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/async_results_merger.cpp90
-rw-r--r--src/mongo/s/query/async_results_merger.h15
-rw-r--r--src/mongo/s/query/async_results_merger_params.idl89
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp658
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp41
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp3
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h39
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h2
-rw-r--r--src/mongo/s/query/cluster_find.cpp4
-rw-r--r--src/mongo/s/query/document_source_router_adapter.cpp4
-rw-r--r--src/mongo/s/query/document_source_router_adapter.h1
-rw-r--r--src/mongo/s/query/establish_cursors.cpp45
-rw-r--r--src/mongo/s/query/establish_cursors.h15
-rw-r--r--src/mongo/s/query/router_exec_stage.h8
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp21
-rw-r--r--src/mongo/s/query/router_stage_merge.h4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h2
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.cpp21
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.h5
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp15
-rw-r--r--src/mongo/s/query/store_possible_cursor.h2
-rw-r--r--src/mongo/shell/bench.cpp15
-rw-r--r--src/mongo/util/net/SConscript1
-rw-r--r--src/mongo/util/net/hostandport.h11
-rw-r--r--src/mongo/util/net/hostandport.idl38
66 files changed, 1376 insertions, 853 deletions
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp
index 399d6cb9143..23a9b12a792 100644
--- a/src/mongo/client/dbclientcursor.cpp
+++ b/src/mongo/client/dbclientcursor.cpp
@@ -137,7 +137,7 @@ Message DBClientCursor::_assembleInit() {
Message DBClientCursor::_assembleGetMore() {
invariant(cursorId);
if (_useFindCommand) {
- long long batchSize = nextBatchSize();
+ std::int64_t batchSize = nextBatchSize();
auto gmr = GetMoreRequest(ns,
cursorId,
boost::make_optional(batchSize != 0, batchSize),
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index bfaa74447cf..3529cce56e9 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -510,7 +510,7 @@ Status runAggregate(OperationContext* opCtx,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
opCtx->recoveryUnit()->getReadConcernLevel(),
cmdObj);
- if (expCtx->tailableMode == TailableMode::kTailableAndAwaitData) {
+ if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
cursorParams.setTailable(true);
cursorParams.setAwaitData(true);
}
diff --git a/src/mongo/db/matcher/expression_optimize_test.cpp b/src/mongo/db/matcher/expression_optimize_test.cpp
index 9091ad48e93..e2f82257a67 100644
--- a/src/mongo/db/matcher/expression_optimize_test.cpp
+++ b/src/mongo/db/matcher/expression_optimize_test.cpp
@@ -133,7 +133,7 @@ TEST(ExpressionOptimizeTest, IsValidText) {
TEST(ExpressionOptimizeTest, IsValidTextTailable) {
// Filter inside QueryRequest is not used.
auto qr = stdx::make_unique<QueryRequest>(nss);
- qr->setTailableMode(TailableMode::kTailable);
+ qr->setTailableMode(TailableModeEnum::kTailable);
ASSERT_OK(qr->validate());
// Invalid: TEXT and tailable.
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 3d4481ebbc4..88c56ab45b9 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -301,6 +301,7 @@ pipelineeEnv.Library(
'document_source_skip.cpp',
'document_source_sort.cpp',
'document_source_sort_by_count.cpp',
+ "cluster_aggregation_planner.cpp",
'document_source_tee_consumer.cpp',
'document_source_unwind.cpp',
'mongo_process_common.cpp',
diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp
new file mode 100644
index 00000000000..3653b5b2638
--- /dev/null
+++ b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp
@@ -0,0 +1,182 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/cluster_aggregation_planner.h"
+
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
+
+namespace mongo {
+namespace cluster_aggregation_planner {
+
+namespace {
+/**
+ * Moves everything before a splittable stage to the shards. If there are no splittable stages,
+ * moves everything to the shards.
+ *
+ * It is not safe to call this optimization multiple times.
+ *
+ * NOTE: looks for SplittableDocumentSources and uses that API
+ */
+void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
+ while (!mergePipe->getSources().empty()) {
+ boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront();
+
+ // Check if this source is splittable.
+ SplittableDocumentSource* splittable =
+ dynamic_cast<SplittableDocumentSource*>(current.get());
+
+ if (!splittable) {
+ // Move the source from the merger _sources to the shard _sources.
+ shardPipe->pushBack(current);
+ } else {
+ // Split this source into 'merge' and 'shard' _sources.
+ boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
+ auto mergeSources = splittable->getMergeSources();
+
+ // A source may not simultaneously be present on both sides of the split.
+ invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) ==
+ mergeSources.end());
+
+ if (shardSource)
+ shardPipe->pushBack(shardSource);
+
+ // Add the stages in reverse order, so that they appear in the pipeline in the same
+ // order as they were returned by the stage.
+ for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) {
+ mergePipe->addInitialSource(*it);
+ }
+
+ break;
+ }
+ }
+}
+
+/**
+ * If the final stage on shards is to unwind an array, move that stage to the merger. This cuts down
+ * on network traffic and allows us to take advantage of reduced copying in unwind.
+ */
+void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) {
+ while (!shardPipe->getSources().empty() &&
+ dynamic_cast<DocumentSourceUnwind*>(shardPipe->getSources().back().get())) {
+ mergePipe->addInitialSource(shardPipe->popBack());
+ }
+}
+
+/**
+ * Adds a stage to the end of 'shardPipe' explicitly requesting all fields that 'mergePipe' needs.
+ * This is only done if it heuristically determines that it is needed. This optimization can reduce
+ * the amount of network traffic and can also enable the shards to convert less source BSON into
+ * Documents.
+ */
+void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) {
+ auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
+ ? DepsTracker::MetadataAvailable::kTextScore
+ : DepsTracker::MetadataAvailable::kNoMetadata;
+ DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata));
+ if (mergeDeps.needWholeDocument)
+ return; // the merge needs all fields, so nothing we can do.
+
+ // Empty project is "special" so if no fields are needed, we just ask for _id instead.
+ if (mergeDeps.fields.empty())
+ mergeDeps.fields.insert("_id");
+
+ // Remove metadata from dependencies since it automatically flows through projection and we
+ // don't want to project it in to the document.
+ mergeDeps.setNeedTextScore(false);
+
+ // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
+ // field dependencies. While this may not be 100% ideal in all cases, it is simple and
+ // avoids the worst cases by ensuring that:
+ // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
+ // dependencies. This situation can happen when a $sort is before the first $project or
+ // $group. Without the optimization, the shards would have to reify and transmit full
+ // objects even though only a subset of fields are needed.
+ // 2) Optimization IS NOT applied immediately following a $project or $group since it would
+ // add an unnecessary project (and therefore a deep-copy).
+ for (auto&& source : shardPipe->getSources()) {
+ DepsTracker dt(depsMetadata);
+ if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
+ return;
+ }
+ // if we get here, add the project.
+ boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson(
+ BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->getContext());
+ shardPipe->pushBack(project);
+}
+} // namespace
+
+void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline) {
+ // The order in which optimizations are applied can have significant impact on the
+ // efficiency of the final pipeline. Be Careful!
+ findSplitPoint(shardPipeline, mergingPipeline);
+ moveFinalUnwindFromShardsToMerger(shardPipeline, mergingPipeline);
+ limitFieldsSentFromShardsToMerger(shardPipeline, mergingPipeline);
+}
+
+boost::optional<BSONObj> popLeadingMergeSort(Pipeline* pipeline) {
+ // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort.
+ auto frontSort = pipeline->popFrontWithNameAndCriteria(
+ DocumentSourceSort::kStageName, [](const DocumentSource* const source) {
+ return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted();
+ });
+
+ if (frontSort) {
+ auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
+ if (auto sortLimit = sortStage->getLimitSrc()) {
+ // There was a limit stage absorbed into the sort stage, so we need to preserve that.
+ pipeline->addInitialSource(sortLimit);
+ }
+ return sortStage
+ ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
+ .toBson();
+ }
+ return boost::none;
+}
+
+void addMergeCursorsSource(Pipeline* mergePipeline,
+ std::vector<RemoteCursor> remoteCursors,
+ executor::TaskExecutor* executor) {
+ AsyncResultsMergerParams armParams;
+ if (auto sort = popLeadingMergeSort(mergePipeline)) {
+ armParams.setSort(std::move(*sort));
+ }
+ armParams.setRemotes(std::move(remoteCursors));
+ armParams.setTailableMode(mergePipeline->getContext()->tailableMode);
+ armParams.setNss(mergePipeline->getContext()->ns);
+ mergePipeline->addInitialSource(DocumentSourceMergeCursors::create(
+ executor, std::move(armParams), mergePipeline->getContext()));
+}
+
+} // namespace cluster_aggregation_planner
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.h b/src/mongo/db/pipeline/cluster_aggregation_planner.h
new file mode 100644
index 00000000000..f62f158f6ca
--- /dev/null
+++ b/src/mongo/db/pipeline/cluster_aggregation_planner.h
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/pipeline.h"
+
+namespace mongo {
+namespace cluster_aggregation_planner {
+
+/**
+ * Performs optimizations with the aim of reducing computing time and network traffic when a
+ * pipeline has been split into two pieces. Modifies 'shardPipeline' and 'mergingPipeline' such that
+ * they may contain different stages, but still compute the same results when executed.
+ */
+void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline);
+
+/**
+ * Rips off an initial $sort stage that can be handled by cursor merging machinery. Returns the
+ * sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
+ */
+boost::optional<BSONObj> popLeadingMergeSort(Pipeline* mergePipeline);
+
+/**
+ * Creates a new DocumentSourceMergeCursors from the provided 'remoteCursors' and adds it to the
+ * front of 'mergePipeline'.
+ */
+void addMergeCursorsSource(Pipeline* mergePipeline,
+ std::vector<RemoteCursor> remoteCursors,
+ executor::TaskExecutor*);
+
+} // namespace cluster_aggregation_planner
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index df7b31b330e..b227afeac5f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -395,7 +395,7 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
// A change stream is a tailable + awaitData cursor.
- expCtx->tailableMode = TailableMode::kTailableAndAwaitData;
+ expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
// Prevent $changeStream from running on an entire database (or cluster-wide) unless we are in
// test mode.
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index dfe4ece48ac..32efd87cdda 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -520,9 +520,8 @@ public:
Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
- if (pipeline->popFrontWithCriteria("$match") ||
- pipeline->popFrontWithCriteria("$sort") ||
- pipeline->popFrontWithCriteria("$project")) {
+ if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") ||
+ pipeline->popFrontWithName("$project")) {
continue;
}
break;
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index 06a314e8c2e..3e34fd366a0 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -43,15 +43,20 @@ constexpr StringData DocumentSourceMergeCursors::kStageName;
DocumentSourceMergeCursors::DocumentSourceMergeCursors(
executor::TaskExecutor* executor,
- std::unique_ptr<ClusterClientCursorParams> cursorParams,
- const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx), _executor(executor), _armParams(std::move(cursorParams)) {}
+ AsyncResultsMergerParams armParams,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<BSONObj> ownedParamsSpec)
+ : DocumentSource(expCtx),
+ _armParamsObj(std::move(ownedParamsSpec)),
+ _executor(executor),
+ _armParams(std::move(armParams)) {}
DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
// We don't expect or support tailable cursors to be executing through this stage.
- invariant(pExpCtx->tailableMode == TailableMode::kNormal);
+ invariant(pExpCtx->tailableMode == TailableModeEnum::kNormal);
if (!_arm) {
- _arm.emplace(pExpCtx->opCtx, _executor, _armParams.get());
+ _arm.emplace(pExpCtx->opCtx, _executor, std::move(*_armParams));
+ _armParams = boost::none;
}
auto next = uassertStatusOK(_arm->blockingNext());
if (next.isEOF()) {
@@ -60,24 +65,19 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
return Document::fromBsonWithMetaData(*next.getResult());
}
-void DocumentSourceMergeCursors::serializeToArray(
- std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
+Value DocumentSourceMergeCursors::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
invariant(!_arm);
invariant(_armParams);
- std::vector<Value> cursors;
- for (auto&& remote : _armParams->remotes) {
- cursors.emplace_back(Document{{"host", remote.hostAndPort.toString()},
- {"ns", remote.cursorResponse.getNSS().toString()},
- {"id", remote.cursorResponse.getCursorId()}});
- }
- array.push_back(Value(Document{{kStageName, Value(std::move(cursors))}}));
- if (!_armParams->sort.isEmpty()) {
- array.push_back(Value(Document{{DocumentSourceSort::kStageName, Value(_armParams->sort)}}));
- }
+ return Value(Document{{kStageName, _armParams->toBSON()}});
}
Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ // TODO SERVER-34009 3.6 and earlier mongos processes serialize $mergeCursors and $sort with
+ // $mergingPresorted as separate stages. This code is responsible for coalescing these two
+ // stages. New mongos processes serialize $mergeCursors with an absorbed sort as one stage. Once
+ // we no longer have to support the old format, this code can be deleted.
invariant(*itr == this);
invariant(!_arm);
invariant(_armParams);
@@ -92,9 +92,9 @@ Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt(
return next;
}
- _armParams->sort =
+ _armParams->setSort(
nextSort->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
- .toBson();
+ .toBson());
if (auto sortLimit = nextSort->getLimitSrc()) {
// There was a limit stage absorbed into the sort stage, so we need to preserve that.
container->insert(std::next(next), sortLimit);
@@ -105,15 +105,31 @@ Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt(
boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- uassert(
- 17026, "$mergeCursors stage expected array as argument", elem.type() == BSONType::Array);
+ if (elem.type() == BSONType::Object) {
+ // This is the modern serialization format. We de-serialize using the IDL.
+ auto ownedObj = elem.embeddedObject().getOwned();
+ auto armParams =
+ AsyncResultsMergerParams::parse(IDLParserErrorContext(kStageName), ownedObj);
+ return new DocumentSourceMergeCursors(
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ std::move(armParams),
+ expCtx,
+ std::move(ownedObj));
+ }
+
+ // This is the old serialization format which can still be generated by mongos processes
+ // older than 4.0.
+ // TODO SERVER-34009 Remove support for this format.
+ uassert(17026,
+ "$mergeCursors stage expected either an array or an object as argument",
+ elem.type() == BSONType::Array);
const auto serializedRemotes = elem.Array();
uassert(50729,
"$mergeCursors stage expected array with at least one entry",
serializedRemotes.size() > 0);
boost::optional<NamespaceString> nss;
- std::vector<ClusterClientCursorParams::RemoteCursor> remotes;
+ std::vector<RemoteCursor> remotes;
for (auto&& cursor : serializedRemotes) {
BSONElement nsElem;
BSONElement hostElem;
@@ -164,28 +180,30 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
// any data in the initial batch.
// TODO SERVER-33323 We use a fake shard id because the AsyncResultsMerger won't use it for
// anything, and finding the real one is non-trivial.
+ RemoteCursor remoteCursor;
+ remoteCursor.setShardId(ShardId("fakeShardIdForMergeCursors"));
+ remoteCursor.setHostAndPort(std::move(host));
std::vector<BSONObj> emptyBatch;
- remotes.push_back({ShardId("fakeShardIdForMergeCursors"),
- std::move(host),
- CursorResponse{*nss, cursorId, emptyBatch}});
+ remoteCursor.setCursorResponse(CursorResponse{*nss, cursorId, emptyBatch});
+ remotes.push_back(std::move(remoteCursor));
}
invariant(nss); // We know there is at least one cursor in 'serializedRemotes', and we require
// each cursor to have a 'ns' field.
- auto params = stdx::make_unique<ClusterClientCursorParams>(*nss);
- params->remotes = std::move(remotes);
+ AsyncResultsMergerParams params;
+ params.setRemotes(std::move(remotes));
+ params.setNss(*nss);
return new DocumentSourceMergeCursors(
Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
std::move(params),
- expCtx);
+ expCtx,
+ elem.embeddedObject().getOwned());
}
boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
- std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors,
executor::TaskExecutor* executor,
+ AsyncResultsMergerParams params,
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- auto params = stdx::make_unique<ClusterClientCursorParams>(expCtx->ns);
- params->remotes = std::move(remoteCursors);
return new DocumentSourceMergeCursors(executor, std::move(params), expCtx);
}
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index cb74f4bd9ef..611d09bcd34 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -55,11 +55,11 @@ public:
BSONElement, const boost::intrusive_ptr<ExpressionContext>&);
/**
- * Creates a new DocumentSourceMergeCursors from the given 'remoteCursors'.
+ * Creates a new DocumentSourceMergeCursors from the given parameters.
*/
static boost::intrusive_ptr<DocumentSource> create(
- std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors,
executor::TaskExecutor*,
+ AsyncResultsMergerParams,
const boost::intrusive_ptr<ExpressionContext>&);
const char* getSourceName() const final {
@@ -77,9 +77,7 @@ public:
/**
* Serializes this stage to be sent to perform the merging on a different host.
*/
- void serializeToArray(
- std::vector<Value>& array,
- boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
StageConstraints constraints(StreamType::kStreaming,
@@ -102,21 +100,23 @@ protected:
private:
DocumentSourceMergeCursors(executor::TaskExecutor*,
- std::unique_ptr<ClusterClientCursorParams>,
- const boost::intrusive_ptr<ExpressionContext>&);
+ AsyncResultsMergerParams,
+ const boost::intrusive_ptr<ExpressionContext>&,
+ boost::optional<BSONObj> ownedParamsSpec = boost::none);
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
- MONGO_UNREACHABLE; // Should call serializeToArray instead.
- }
+ // When we have parsed the params out of a BSONObj, the object needs to stay around while the
+ // params are in use. We store them here.
+ boost::optional<BSONObj> _armParamsObj;
executor::TaskExecutor* _executor;
- // '_arm' is not populated until the first call to getNext(). If getNext() is never called we
- // will not create an AsyncResultsMerger. If we did so the destruction of this stage would cause
- // the cursors within the ARM to be killed prematurely. For example, if this stage is parsed on
- // mongos then forwarded to the shards, it should not kill the cursors when it goes out of scope
- // on mongos.
- std::unique_ptr<ClusterClientCursorParams> _armParams;
+ // '_armParams' is populated until the first call to getNext(). Upon the first call to getNext()
+ // '_arm' will be populated using '_armParams', and '_armParams' will become boost::none. So if
+ // getNext() is never called we will never populate '_arm'. If we did so the destruction of this
+ // stage would cause the cursors within the ARM to be killed prematurely. For example, if this
+ // stage is parsed on mongos then forwarded to the shards, it should not kill the cursors when
+ // it goes out of scope on mongos.
+ boost::optional<AsyncResultsMergerParams> _armParams;
boost::optional<AsyncResultsMerger> _arm;
};
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
index 4809ac54b50..3614324d27d 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
@@ -111,9 +111,7 @@ private:
};
TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectNonArray) {
- auto spec = BSON("$mergeCursors" << BSON(
- "cursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host"
- << kTestHost.toString()))));
+ auto spec = BSON("$mergeCursors" << 2);
ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
AssertionException,
17026);
@@ -213,16 +211,66 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseTheSerializedVersionOf
ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx()));
}
+RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
+ RemoteCursor remoteCursor;
+ remoteCursor.setShardId(std::move(shardId));
+ remoteCursor.setHostAndPort(std::move(host));
+ remoteCursor.setCursorResponse(std::move(response));
+ return remoteCursor;
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseSerializedARMParams) {
+ AsyncResultsMergerParams params;
+ params.setSort(BSON("y" << 1 << "z" << 1));
+ params.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(makeRemoteCursor(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, kExhaustedCursorID, {})));
+ params.setRemotes(std::move(cursors));
+ auto spec = BSON("$mergeCursors" << params.toBSON());
+ auto mergeCursors =
+ DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx());
+ std::vector<Value> serializationArray;
+ mergeCursors->serializeToArray(serializationArray);
+ ASSERT_EQ(serializationArray.size(), 1UL);
+
+ // Make sure the serialized version can be parsed into an identical AsyncResultsMergerParams.
+ auto newSpec = serializationArray[0].getDocument().toBson();
+ ASSERT(newSpec["$mergeCursors"].type() == BSONType::Object);
+ auto newParams = AsyncResultsMergerParams::parse(IDLParserErrorContext("$mergeCursors test"),
+ newSpec["$mergeCursors"].Obj());
+ ASSERT_TRUE(params.getSort());
+ ASSERT_BSONOBJ_EQ(*params.getSort(), *newParams.getSort());
+ ASSERT_EQ(params.getCompareWholeSortKey(), newParams.getCompareWholeSortKey());
+ ASSERT(params.getTailableMode() == newParams.getTailableMode());
+ ASSERT(params.getBatchSize() == newParams.getBatchSize());
+ ASSERT_EQ(params.getNss(), newParams.getNss());
+ ASSERT_EQ(params.getAllowPartialResults(), newParams.getAllowPartialResults());
+ ASSERT_EQ(newParams.getRemotes().size(), 1UL);
+ ASSERT(newParams.getRemotes()[0].getShardId() == kTestShardIds[0].toString());
+ ASSERT(newParams.getRemotes()[0].getHostAndPort() == kTestShardHosts[0]);
+ ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getNSS(), kTestNss);
+ ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getCursorId(), kExhaustedCursorID);
+ ASSERT(newParams.getRemotes()[0].getCursorResponse().getBatch().empty());
+
+ // Test that the $mergeCursors stage will accept the serialized format of
+ // AsyncResultsMergerParams.
+ ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx()));
+}
+
TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) {
auto expCtx = getExpCtx();
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
- cursors.emplace_back(
- kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
+ AsyncResultsMergerParams armParams;
+ armParams.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(makeRemoteCursor(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {})));
+ cursors.emplace_back(makeRemoteCursor(
+ kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {})));
+ armParams.setRemotes(std::move(cursors));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
auto mergeCursorsStage =
- DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx);
ASSERT_TRUE(mergeCursorsStage->getNext().isEOF());
}
@@ -236,12 +284,17 @@ BSONObj cursorResponseObj(const NamespaceString& nss,
TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
auto expCtx = getExpCtx();
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ AsyncResultsMergerParams armParams;
+ armParams.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
+ armParams.setRemotes(std::move(cursors));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
+ DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
// Iterate the $mergeCursors stage asynchronously on a different thread, since it will block
// waiting for network responses, which we will manually schedule below.
@@ -280,12 +333,17 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) {
auto expCtx = getExpCtx();
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ AsyncResultsMergerParams armParams;
+ armParams.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
+ armParams.setRemotes(std::move(cursors));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
+ DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
pipeline.reset(); // Delete the pipeline before using it.
@@ -295,11 +353,15 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) {
TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) {
auto expCtx = getExpCtx();
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ AsyncResultsMergerParams armParams;
+ armParams.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
+ armParams.setRemotes(std::move(cursors));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
+ DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
// Iterate the pipeline asynchronously on a different thread, since it will block waiting for
// network responses, which we will manually schedule below.
@@ -344,12 +406,16 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd
auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
// Make a $mergeCursors stage and add it to the front of the pipeline.
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
- auto mergeCursorsStage =
- DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
- pipeline->addInitialSource(std::move(mergeCursorsStage));
+ AsyncResultsMergerParams armParams;
+ armParams.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
+ armParams.setRemotes(std::move(cursors));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
// After optimization we should only have a $mergeCursors stage.
pipeline->optimizePipeline();
@@ -364,6 +430,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd
ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}}));
ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}}));
ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
+ std::cout << "Finished";
});
onCommand([&](const auto& request) {
@@ -382,36 +449,35 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd
future.timed_get(kFutureTimeout);
}
-TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) {
+TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) {
auto expCtx = getExpCtx();
+ auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
- // Make a pipeline with a single $sort stage that is merging pre-sorted results.
- const bool mergingPresorted = true;
- const long long limit = 3;
- auto sortStage = DocumentSourceSort::create(
- expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted);
- auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
-
- // Make a $mergeCursors stage and add it to the front of the pipeline.
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
- auto mergeCursorsStage =
- DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
- pipeline->addInitialSource(std::move(mergeCursorsStage));
+ // Make a $mergeCursors stage with a sort on "x" and add it to the front of the pipeline.
+ AsyncResultsMergerParams armParams;
+ armParams.setNss(kTestNss);
+ armParams.setSort(BSON("x" << 1));
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
+ armParams.setRemotes(std::move(cursors));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
- // After optimization, we should still have a $limit stage.
+ // After optimization we should only have a $mergeCursors stage.
pipeline->optimizePipeline();
- ASSERT_EQ(pipeline->getSources().size(), 2UL);
+ ASSERT_EQ(pipeline->getSources().size(), 1UL);
ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
- ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
// Iterate the pipeline asynchronously on a different thread, since it will block waiting for
// network responses, which we will manually schedule below.
- auto future = launchAsync([&]() {
- for (int i = 1; i <= limit; ++i) {
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}}));
- }
+ auto future = launchAsync([&pipeline]() {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}}));
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 2}}));
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}}));
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}}));
ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
});
@@ -431,7 +497,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea
future.timed_get(kFutureTimeout);
}
-TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) {
+TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) {
auto expCtx = getExpCtx();
// Make a pipeline with a single $sort stage that is merging pre-sorted results.
@@ -442,12 +508,16 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize)
auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
// Make a $mergeCursors stage and add it to the front of the pipeline.
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
- auto mergeCursorsStage =
- DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
- pipeline->addInitialSource(std::move(mergeCursorsStage));
+ AsyncResultsMergerParams armParams;
+ armParams.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
+ armParams.setRemotes(std::move(cursors));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
// After optimization, we should still have a $limit stage.
pipeline->optimizePipeline();
@@ -455,11 +525,29 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize)
ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
- auto serialized = pipeline->serialize();
- ASSERT_EQ(serialized.size(), 3UL);
- ASSERT_FALSE(serialized[0]["$mergeCursors"].missing());
- ASSERT_FALSE(serialized[1]["$sort"].missing());
- ASSERT_FALSE(serialized[2]["$limit"].missing());
+ // Iterate the pipeline asynchronously on a different thread, since it will block waiting for
+ // network responses, which we will manually schedule below.
+ auto future = launchAsync([&]() {
+ for (int i = 1; i <= limit; ++i) {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}}));
+ }
+ ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
+ });
+
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 1 << "$sortKey" << BSON("" << 1)),
+ BSON("x" << 3 << "$sortKey" << BSON("" << 3))});
+ });
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 2 << "$sortKey" << BSON("" << 2)),
+ BSON("x" << 4 << "$sortKey" << BSON("" << 4))});
+ });
+
+ future.timed_get(kFutureTimeout);
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index d2ad1cd9ec4..c5989718d5e 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -262,7 +262,7 @@ private:
uint64_t _maxMemoryUsageBytes;
bool _done;
- bool _mergingPresorted;
+ bool _mergingPresorted; // TODO SERVER-34009 Remove this flag.
std::unique_ptr<MySorter> _sorter;
std::unique_ptr<MySorter::Iterator> _output;
};
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 6847d43e8a4..b5e490e91d7 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -161,7 +161,7 @@ public:
* query.
*/
bool isTailableAwaitData() const {
- return tailableMode == TailableMode::kTailableAndAwaitData;
+ return tailableMode == TailableModeEnum::kTailableAndAwaitData;
}
// The explain verbosity requested by the user, or boost::none if no explain was requested.
@@ -198,7 +198,7 @@ public:
Variables variables;
VariablesParseState variablesParseState;
- TailableMode tailableMode = TailableMode::kNormal;
+ TailableModeEnum tailableMode = TailableModeEnum::kNormal;
// Tracks the depth of nested aggregation sub-pipelines. Used to enforce depth limits.
size_t subPipelineDepth = 0;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 2f6fce9e526..335ca4f5655 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -28,9 +28,7 @@
#include "mongo/platform/basic.h"
-// This file defines functions from both of these headers
#include "mongo/db/pipeline/pipeline.h"
-#include "mongo/db/pipeline/pipeline_optimizations.h"
#include <algorithm>
@@ -40,6 +38,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
@@ -47,6 +46,7 @@
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -329,13 +329,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() {
// Keep a copy of the original source list in case we need to reset the pipeline from split to
// unsplit later.
shardPipeline->_unsplitSources.emplace(_sources);
-
- // The order in which optimizations are applied can have significant impact on the
- // efficiency of the final pipeline. Be Careful!
- Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
- Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
- Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
-
+ cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this);
shardPipeline->_splitState = SplitState::kSplitForShards;
_splitState = SplitState::kSplitForMerge;
@@ -366,87 +360,6 @@ void Pipeline::unsplitFromSharded(
stitch();
}
-void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
- while (!mergePipe->_sources.empty()) {
- intrusive_ptr<DocumentSource> current = mergePipe->_sources.front();
- mergePipe->_sources.pop_front();
-
- // Check if this source is splittable.
- SplittableDocumentSource* splittable =
- dynamic_cast<SplittableDocumentSource*>(current.get());
-
- if (!splittable) {
- // Move the source from the merger _sources to the shard _sources.
- shardPipe->_sources.push_back(current);
- } else {
- // Split this source into 'merge' and 'shard' _sources.
- intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
- auto mergeSources = splittable->getMergeSources();
-
- // A source may not simultaneously be present on both sides of the split.
- invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) ==
- mergeSources.end());
-
- if (shardSource)
- shardPipe->_sources.push_back(shardSource);
-
- // Add the stages in reverse order, so that they appear in the pipeline in the same
- // order as they were returned by the stage.
- for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) {
- mergePipe->_sources.push_front(*it);
- }
-
- break;
- }
- }
-}
-
-void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe,
- Pipeline* mergePipe) {
- while (!shardPipe->_sources.empty() &&
- dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) {
- mergePipe->_sources.push_front(shardPipe->_sources.back());
- shardPipe->_sources.pop_back();
- }
-}
-
-void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
- Pipeline* mergePipe) {
- auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
- ? DepsTracker::MetadataAvailable::kTextScore
- : DepsTracker::MetadataAvailable::kNoMetadata;
- DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata));
- if (mergeDeps.needWholeDocument)
- return; // the merge needs all fields, so nothing we can do.
-
- // Empty project is "special" so if no fields are needed, we just ask for _id instead.
- if (mergeDeps.fields.empty())
- mergeDeps.fields.insert("_id");
-
- // Remove metadata from dependencies since it automatically flows through projection and we
- // don't want to project it in to the document.
- mergeDeps.setNeedTextScore(false);
-
- // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
- // field dependencies. While this may not be 100% ideal in all cases, it is simple and
- // avoids the worst cases by ensuring that:
- // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
- // dependencies. This situation can happen when a $sort is before the first $project or
- // $group. Without the optimization, the shards would have to reify and transmit full
- // objects even though only a subset of fields are needed.
- // 2) Optimization IS NOT applied immediately following a $project or $group since it would
- // add an unnecessary project (and therefore a deep-copy).
- for (auto&& source : shardPipe->_sources) {
- DepsTracker dt(depsMetadata);
- if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
- return;
- }
- // if we get here, add the project.
- boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson(
- BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx);
- shardPipe->_sources.push_back(project);
-}
-
BSONObj Pipeline::getInitialQuery() const {
if (_sources.empty())
return BSONObj();
@@ -699,7 +612,35 @@ Status Pipeline::_pipelineCanRunOnMongoS() const {
return Status::OK();
}
-boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria(
+void Pipeline::pushBack(boost::intrusive_ptr<DocumentSource> newStage) {
+ newStage->setSource(_sources.back().get());
+ _sources.push_back(std::move(newStage));
+}
+
+boost::intrusive_ptr<DocumentSource> Pipeline::popBack() {
+ if (_sources.empty()) {
+ return nullptr;
+ }
+ auto targetStage = _sources.back();
+ _sources.pop_back();
+ return targetStage;
+}
+
+boost::intrusive_ptr<DocumentSource> Pipeline::popFront() {
+ if (_sources.empty()) {
+ return nullptr;
+ }
+ auto targetStage = _sources.front();
+ _sources.pop_front();
+ stitch();
+ return targetStage;
+}
+
+boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithName(StringData targetStageName) {
+ return popFrontWithNameAndCriteria(targetStageName, nullptr);
+}
+
+boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria(
StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) {
if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) {
return nullptr;
@@ -710,8 +651,7 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria(
return nullptr;
}
- _sources.pop_front();
- stitch();
- return targetStage;
+ return popFront();
}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index bcb7ee64521..04b5769792d 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -39,6 +39,8 @@
#include "mongo/db/pipeline/value.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/query/query_knobs.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/timer.h"
@@ -268,13 +270,34 @@ public:
}
/**
+ * Removes and returns the first stage of the pipeline. Returns nullptr if the pipeline is
+ * empty.
+ */
+ boost::intrusive_ptr<DocumentSource> popFront();
+
+ /**
+ * Removes and returns the last stage of the pipeline. Returns nullptr if the pipeline is empty.
+ */
+ boost::intrusive_ptr<DocumentSource> popBack();
+
+ /**
+ * Adds the given stage to the end of the pipeline.
+ */
+ void pushBack(boost::intrusive_ptr<DocumentSource>);
+
+ /**
+ * Removes and returns the first stage of the pipeline if its name is 'targetStageName'.
+ * Returns nullptr if there is no first stage with that name.
+ */
+ boost::intrusive_ptr<DocumentSource> popFrontWithName(StringData targetStageName);
+
+ /**
* Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the
* given 'predicate' function, if present, returns 'true' when called with a pointer to the
* stage. Returns nullptr if there is no first stage which meets these criteria.
*/
- boost::intrusive_ptr<DocumentSource> popFrontWithCriteria(
- StringData targetStageName,
- stdx::function<bool(const DocumentSource* const)> predicate = nullptr);
+ boost::intrusive_ptr<DocumentSource> popFrontWithNameAndCriteria(
+ StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate);
/**
* PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists
@@ -286,15 +309,6 @@ public:
friend class PipelineD;
private:
- class Optimizations {
- public:
- // This contains static functions that optimize pipelines in various ways.
- // This is a class rather than a namespace so that it can be a friend of Pipeline.
- // It is defined in pipeline_optimizations.h.
- class Sharded;
- };
-
- friend class Optimizations::Sharded;
friend class PipelineDeleter;
/**
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 244946bd877..043452c3e0f 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -386,7 +386,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS;
}
- if (expCtx->needsMerge && expCtx->tailableMode == TailableMode::kTailableAndAwaitData) {
+ if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
}
diff --git a/src/mongo/db/pipeline/pipeline_optimizations.h b/src/mongo/db/pipeline/pipeline_optimizations.h
deleted file mode 100644
index af85fdff4b0..00000000000
--- a/src/mongo/db/pipeline/pipeline_optimizations.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Copyright 2013 (c) 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-/**
- * This file declares optimizations available on Pipelines. For now they should be considered part
- * of Pipeline's implementation rather than it's interface.
- */
-
-#pragma once
-
-#include "mongo/db/pipeline/pipeline.h"
-
-namespace mongo {
-/**
- * This class holds optimizations applied to a shard Pipeline and a merger Pipeline.
- *
- * Each function has the same signature and takes two Pipelines, both as an in/out parameters.
- */
-class Pipeline::Optimizations::Sharded {
-public:
- /**
- * Moves everything before a splittable stage to the shards. If there
- * are no splittable stages, moves everything to the shards.
- *
- * It is not safe to call this optimization multiple times.
- *
- * NOTE: looks for SplittableDocumentSources and uses that API
- */
- static void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe);
-
- /**
- * If the final stage on shards is to unwind an array, move that stage to the merger. This
- * cuts down on network traffic and allows us to take advantage of reduced copying in
- * unwind.
- */
- static void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe);
-
- /**
- * Adds a stage to the end of shardPipe explicitly requesting all fields that mergePipe
- * needs. This is only done if it heuristically determines that it is needed. This
- * optimization can reduce the amount of network traffic and can also enable the shards to
- * convert less source BSON into Documents.
- */
- static void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe);
-};
-} // namespace mongo
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 81c4ac22cf9..3638d5ba00b 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -187,7 +187,8 @@ env.Library(
target="query_request",
source=[
"query_request.cpp",
- "tailable_mode.cpp"
+ "tailable_mode.cpp",
+ env.Idlc("tailable_mode.idl")[0],
],
LIBDEPS=[
"$BUILD_DIR/mongo/base",
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index d3852f2ee21..b2dd828f4c2 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -141,6 +141,23 @@ public:
};
/**
+ * Constructs a CursorResponse from the command BSON response.
+ */
+ static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse);
+
+ /**
+ * A throwing version of 'parseFromBSON'.
+ */
+ static CursorResponse parseFromBSONThrowing(const BSONObj& cmdResponse) {
+ return uassertStatusOK(parseFromBSON(cmdResponse));
+ }
+
+ /**
+ * Constructs an empty cursor response.
+ */
+ CursorResponse() = default;
+
+ /**
* Constructs from values for each of the fields.
*/
CursorResponse(NamespaceString nss,
@@ -186,15 +203,13 @@ public:
}
/**
- * Constructs a CursorResponse from the command BSON response.
- */
- static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse);
-
- /**
* Converts this response to its raw BSON representation.
*/
BSONObj toBSON(ResponseType responseType) const;
void addToBSON(ResponseType responseType, BSONObjBuilder* builder) const;
+ BSONObj toBSONAsInitialResponse() const {
+ return toBSON(ResponseType::InitialResponse);
+ }
private:
NamespaceString _nss;
diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp
index a6b29ea520b..ea2515123f6 100644
--- a/src/mongo/db/query/getmore_request.cpp
+++ b/src/mongo/db/query/getmore_request.cpp
@@ -59,7 +59,7 @@ GetMoreRequest::GetMoreRequest() : cursorid(0), batchSize(0) {}
GetMoreRequest::GetMoreRequest(NamespaceString namespaceString,
CursorId id,
- boost::optional<long long> sizeOfBatch,
+ boost::optional<std::int64_t> sizeOfBatch,
boost::optional<Milliseconds> awaitDataTimeout,
boost::optional<long long> term,
boost::optional<repl::OpTime> lastKnownCommittedOpTime)
@@ -108,7 +108,7 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna
boost::optional<NamespaceString> nss;
// Optional fields.
- boost::optional<long long> batchSize;
+ boost::optional<std::int64_t> batchSize;
boost::optional<Milliseconds> awaitDataTimeout;
boost::optional<long long> term;
boost::optional<repl::OpTime> lastKnownCommittedOpTime;
diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h
index 8fa2d0fc5dd..4da4839abbd 100644
--- a/src/mongo/db/query/getmore_request.h
+++ b/src/mongo/db/query/getmore_request.h
@@ -52,7 +52,7 @@ struct GetMoreRequest {
*/
GetMoreRequest(NamespaceString namespaceString,
CursorId id,
- boost::optional<long long> sizeOfBatch,
+ boost::optional<std::int64_t> sizeOfBatch,
boost::optional<Milliseconds> awaitDataTimeout,
boost::optional<long long> term,
boost::optional<repl::OpTime> lastKnownCommittedOpTime);
@@ -76,7 +76,7 @@ struct GetMoreRequest {
// The batch size is optional. If not provided, we will put as many documents into the batch
// as fit within the byte limit.
- const boost::optional<long long> batchSize;
+ const boost::optional<std::int64_t> batchSize;
// The number of milliseconds for which a getMore on a tailable, awaitData query should block.
const boost::optional<Milliseconds> awaitDataTimeout;
diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp
index 6268785c6a6..56ffe5dbb42 100644
--- a/src/mongo/db/query/query_request.cpp
+++ b/src/mongo/db/query/query_request.cpp
@@ -495,16 +495,16 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const {
}
switch (_tailableMode) {
- case TailableMode::kTailable: {
+ case TailableModeEnum::kTailable: {
cmdBuilder->append(kTailableField, true);
break;
}
- case TailableMode::kTailableAndAwaitData: {
+ case TailableModeEnum::kTailableAndAwaitData: {
cmdBuilder->append(kTailableField, true);
cmdBuilder->append(kAwaitDataField, true);
break;
}
- case TailableMode::kNormal: {
+ case TailableModeEnum::kNormal: {
break;
}
}
@@ -623,7 +623,7 @@ Status QueryRequest::validate() const {
<< _maxTimeMS);
}
- if (_tailableMode != TailableMode::kNormal) {
+ if (_tailableMode != TailableModeEnum::kNormal) {
// Tailable cursors cannot have any sort other than {$natural: 1}.
const BSONObj expectedSort = BSON(kNaturalSortField << 1);
if (!_sort.isEmpty() &&
@@ -911,9 +911,9 @@ Status QueryRequest::initFullQuery(const BSONObj& top) {
int QueryRequest::getOptions() const {
int options = 0;
- if (_tailableMode == TailableMode::kTailable) {
+ if (_tailableMode == TailableModeEnum::kTailable) {
options |= QueryOption_CursorTailable;
- } else if (_tailableMode == TailableMode::kTailableAndAwaitData) {
+ } else if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) {
options |= QueryOption_CursorTailable;
options |= QueryOption_AwaitData;
}
diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h
index 7a89e466faf..29abf54aaca 100644
--- a/src/mongo/db/query/query_request.h
+++ b/src/mongo/db/query/query_request.h
@@ -319,19 +319,19 @@ public:
}
bool isTailable() const {
- return _tailableMode == TailableMode::kTailable ||
- _tailableMode == TailableMode::kTailableAndAwaitData;
+ return _tailableMode == TailableModeEnum::kTailable ||
+ _tailableMode == TailableModeEnum::kTailableAndAwaitData;
}
bool isTailableAndAwaitData() const {
- return _tailableMode == TailableMode::kTailableAndAwaitData;
+ return _tailableMode == TailableModeEnum::kTailableAndAwaitData;
}
- void setTailableMode(TailableMode tailableMode) {
+ void setTailableMode(TailableModeEnum tailableMode) {
_tailableMode = tailableMode;
}
- TailableMode getTailableMode() const {
+ TailableModeEnum getTailableMode() const {
return _tailableMode;
}
@@ -498,7 +498,7 @@ private:
bool _hasReadPref = false;
// Options that can be specified in the OP_QUERY 'flags' header.
- TailableMode _tailableMode = TailableMode::kNormal;
+ TailableModeEnum _tailableMode = TailableModeEnum::kNormal;
bool _slaveOk = false;
bool _oplogReplay = false;
bool _noCursorTimeout = false;
diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp
index edcf58cecb1..9992fdb155e 100644
--- a/src/mongo/db/query/query_request_test.cpp
+++ b/src/mongo/db/query/query_request_test.cpp
@@ -1165,7 +1165,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithShowRecordIdFails) {
TEST(QueryRequestTest, ConvertToAggregationWithTailableFails) {
QueryRequest qr(testns);
- qr.setTailableMode(TailableMode::kTailable);
+ qr.setTailableMode(TailableModeEnum::kTailable);
ASSERT_NOT_OK(qr.asAggregationCommand());
}
@@ -1183,7 +1183,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithNoCursorTimeoutFails) {
TEST(QueryRequestTest, ConvertToAggregationWithAwaitDataFails) {
QueryRequest qr(testns);
- qr.setTailableMode(TailableMode::kTailableAndAwaitData);
+ qr.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
ASSERT_NOT_OK(qr.asAggregationCommand());
}
diff --git a/src/mongo/db/query/tailable_mode.cpp b/src/mongo/db/query/tailable_mode.cpp
index b19a1988672..f09a7b59c41 100644
--- a/src/mongo/db/query/tailable_mode.cpp
+++ b/src/mongo/db/query/tailable_mode.cpp
@@ -32,17 +32,17 @@
namespace mongo {
-StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData) {
+StatusWith<TailableModeEnum> tailableModeFromBools(bool isTailable, bool isAwaitData) {
if (isTailable) {
if (isAwaitData) {
- return TailableMode::kTailableAndAwaitData;
+ return TailableModeEnum::kTailableAndAwaitData;
}
- return TailableMode::kTailable;
+ return TailableModeEnum::kTailable;
} else if (isAwaitData) {
return {ErrorCodes::FailedToParse,
"Cannot set 'awaitData' without also setting 'tailable'"};
}
- return TailableMode::kNormal;
+ return TailableModeEnum::kNormal;
}
} // namespace mongo
diff --git a/src/mongo/db/query/tailable_mode.h b/src/mongo/db/query/tailable_mode.h
index 92c0fe9292e..531191c6f76 100644
--- a/src/mongo/db/query/tailable_mode.h
+++ b/src/mongo/db/query/tailable_mode.h
@@ -30,19 +30,14 @@
#include "mongo/base/error_codes.h"
#include "mongo/base/status_with.h"
+#include "mongo/db/query/tailable_mode_gen.h"
namespace mongo {
-enum class TailableMode {
- kNormal,
- kTailable,
- kTailableAndAwaitData,
-};
-
/**
* Returns a TailableMode from two booleans, returning ErrorCodes::FailedToParse if awaitData is
* set without tailable.
*/
-StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData);
+StatusWith<TailableModeEnum> tailableModeFromBools(bool isTailable, bool isAwaitData);
} // namespace mongo
diff --git a/src/mongo/db/query/tailable_mode.idl b/src/mongo/db/query/tailable_mode.idl
new file mode 100644
index 00000000000..33d5b6989dd
--- /dev/null
+++ b/src/mongo/db/query/tailable_mode.idl
@@ -0,0 +1,37 @@
+# Copyright (C) 2018 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the GNU Affero General Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+global:
+ cpp_namespace: "mongo"
+
+enums:
+ TailableMode:
+ description: "Describes the tailablility of a cursor."
+ type: string
+ values:
+ kNormal: "normal"
+ kTailable: "tailable"
+ kTailableAndAwaitData: "tailableAndAwaitData"
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index bcb0da9d789..fb0890b87a5 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1241,7 +1241,6 @@ env.Library(
'$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/s/query/async_results_merger',
- '$BUILD_DIR/mongo/s/query/cluster_client_cursor',
'$BUILD_DIR/mongo/util/progress_meter',
],
)
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 773f54a0b8b..8aee68a4c58 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -43,7 +43,6 @@
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/server_parameters.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/fail_point_service.h"
@@ -613,20 +612,25 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
<< " cursors established.";
// Initialize the 'AsyncResultsMerger'(ARM).
- std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
+ std::vector<RemoteCursor> remoteCursors;
for (auto&& cursorResponse : cursorResponses) {
// A placeholder 'ShardId' is used until the ARM is made less sharding specific.
- remoteCursors.emplace_back(
- ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse));
+ remoteCursors.emplace_back();
+ auto& newCursor = remoteCursors.back();
+ newCursor.setShardId("CollectionClonerSyncSource");
+ newCursor.setHostAndPort(_source);
+ newCursor.setCursorResponse(std::move(cursorResponse));
}
- _clusterClientCursorParams = stdx::make_unique<ClusterClientCursorParams>(_sourceNss);
- _clusterClientCursorParams->remotes = std::move(remoteCursors);
- if (_collectionCloningBatchSize > 0)
- _clusterClientCursorParams->batchSize = _collectionCloningBatchSize;
+ AsyncResultsMergerParams armParams;
+ armParams.setNss(_sourceNss);
+ armParams.setRemotes(std::move(remoteCursors));
+ if (_collectionCloningBatchSize > 0) {
+ armParams.setBatchSize(_collectionCloningBatchSize);
+ }
Client::initThreadIfNotAlready();
_arm = stdx::make_unique<AsyncResultsMerger>(
- cc().getOperationContext(), _executor, _clusterClientCursorParams.get());
+ cc().getOperationContext(), _executor, std::move(armParams));
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index e79258e3734..d97c5c0048f 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -301,8 +301,6 @@ private:
const int _maxNumClonerCursors;
// (M) Component responsible for fetching the documents from the collection cloner cursor(s).
std::unique_ptr<AsyncResultsMerger> _arm;
- // (R) The cursor parameters used by the 'AsyncResultsMerger'.
- std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams;
// (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'.
executor::TaskExecutor::EventHandle _killArmHandle;
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index ebfaccbde5c..5a834ed8aa5 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -369,7 +369,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout)
opCtx(), collScanParams, workingSet.get(), matchExpression.get());
auto queryRequest = stdx::make_unique<QueryRequest>(nss);
queryRequest->setFilter(filter);
- queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData);
+ queryRequest->setTailableMode(TailableModeEnum::kTailableAndAwaitData);
auto canonicalQuery = unittest::assertGet(
CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
auto planExecutor =
@@ -381,7 +381,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout)
PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = TailableMode::kTailableAndAwaitData;
+ ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
@@ -419,7 +419,7 @@ TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) {
PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = TailableMode::kNormal;
+ ctx()->tailableMode = TailableModeEnum::kNormal;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
@@ -449,7 +449,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil
opCtx(), collScanParams, workingSet.get(), matchExpression.get());
auto queryRequest = stdx::make_unique<QueryRequest>(nss);
queryRequest->setFilter(filter);
- queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData);
+ queryRequest->setTailableMode(TailableModeEnum::kTailableAndAwaitData);
auto canonicalQuery = unittest::assertGet(
CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
auto planExecutor =
@@ -461,7 +461,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil
PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = TailableMode::kTailableAndAwaitData;
+ ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
@@ -498,7 +498,7 @@ TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) {
PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = TailableMode::kNormal;
+ ctx()->tailableMode = TailableModeEnum::kNormal;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index 8f8025e4b9d..51a5f7723cd 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -99,7 +99,7 @@ public:
Collection* coll,
BSONObj& filterObj,
PlanExecutor::YieldPolicy yieldPolicy = PlanExecutor::YieldPolicy::YIELD_MANUAL,
- TailableMode tailableMode = TailableMode::kNormal) {
+ TailableModeEnum tailableMode = TailableModeEnum::kNormal) {
CollectionScanParams csparams;
csparams.collection = coll;
csparams.direction = CollectionScanParams::FORWARD;
@@ -307,7 +307,7 @@ TEST_F(PlanExecutorTest, ShouldReportErrorIfKilledDuringYieldButIsTailableAndAwa
auto exec = makeCollScanExec(coll,
filterObj,
PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT,
- TailableMode::kTailableAndAwaitData);
+ TailableModeEnum::kTailableAndAwaitData);
BSONObj resultObj;
ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr));
@@ -323,7 +323,7 @@ TEST_F(PlanExecutorTest, ShouldNotSwallowExceedsTimeLimitDuringYieldButIsTailabl
Collection* coll = ctx.getCollection();
auto exec = makeCollScanExec(
- coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableMode::kTailable);
+ coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableModeEnum::kTailable);
BSONObj resultObj;
ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr));
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 242cbc3c10c..c0643205001 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
@@ -263,25 +264,14 @@ BSONObj createCommandForMergingShard(
return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), true);
}
-/**
- * Verifies that the shardIds are the same as they were atClusterTime using versioned table.
- * TODO: SERVER-33767
- */
-bool verifyTargetedShardsAtClusterTime(OperationContext* opCtx,
- const std::set<ShardId>& shardIds,
- LogicalTime atClusterTime) {
- return true;
-}
-
-std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursors(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- CachedCollectionRoutingInfo* routingInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
+std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ CachedCollectionRoutingInfo* routingInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery,
+ const BSONObj& collation) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
bool mustRunOnAll = mustRunOnAllShards(nss, *routingInfo, litePipe);
@@ -351,7 +341,7 @@ struct DispatchShardPipelineResults {
// Populated if this *is not* an explain, this vector represents the cursors on the remote
// shards.
- std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
+ std::vector<RemoteCursor> remoteCursors;
// Populated if this *is* an explain, this vector represents the results from each shard.
std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
@@ -389,7 +379,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// pipeline is already split and we now only need to target a single shard, reassemble the
// original pipeline.
// - After exhausting 10 attempts to establish the cursors, we give up and throw.
- auto cursors = std::vector<ClusterClientCursorParams::RemoteCursor>();
+ auto cursors = std::vector<RemoteCursor>();
auto shardResults = std::vector<AsyncRequestsSender::Response>();
auto opCtx = expCtx->opCtx;
@@ -547,7 +537,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
BSONObj cmdToRunOnNewShards,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
+ std::vector<RemoteCursor> cursors) {
ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
@@ -555,7 +545,6 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
params.tailableMode = pipelineForMerging->getContext()->tailableMode;
params.mergePipeline = std::move(pipelineForMerging);
params.remotes = std::move(cursors);
-
// A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
// size we pass here is used for getMores, so do not specify a batch size if the initial request
// had a batch size of 0.
@@ -565,12 +554,19 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
if (liteParsedPipeline.hasChangeStream()) {
// For change streams, we need to set up a custom stage to establish cursors on new shards
- // when they are added.
- params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
+ // when they are added. Be careful to extract the targeted shard IDs before the remote
+ // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger.
+ std::vector<ShardId> shardIds;
+ for (const auto& remote : params.remotes) {
+ shardIds.emplace_back(remote.getShardId().toString());
+ }
+
+ params.createCustomCursorSource = [cmdToRunOnNewShards,
+ shardIds](OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
return stdx::make_unique<RouterStageUpdateOnAddShard>(
- opCtx, executor, params, cmdToRunOnNewShards);
+ opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards);
};
}
auto ccc = ClusterClientCursorImpl::make(
@@ -619,7 +615,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
ccc->detachFromOperationContext();
- int nShards = ccc->getRemotes().size();
+ int nShards = ccc->getNumRemotes();
CursorId clusterCursorId = 0;
if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
@@ -685,7 +681,8 @@ ShardId pickMergingShard(OperationContext* opCtx,
return dispatchResults.needsPrimaryShardMerge
? primaryShard
: dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())]
- .shardId;
+ .getShardId()
+ .toString();
}
} // namespace
@@ -838,15 +835,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto executorPool = Grid::get(opCtx)->getExecutorPool();
const BSONObj reply = uassertStatusOK(storePossibleCursor(
opCtx,
- remoteCursor.shardId,
- remoteCursor.hostAndPort,
- remoteCursor.cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse),
+ remoteCursor.getShardId().toString(),
+ remoteCursor.getHostAndPort(),
+ remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse),
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
mergeCtx->tailableMode));
- return appendCursorResponseToCommandResult(remoteCursor.shardId, reply, result);
+ return appendCursorResponseToCommandResult(
+ remoteCursor.getShardId().toString(), reply, result);
}
// If we reach here, we have a merge pipeline to dispatch.
@@ -882,10 +880,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
ShardId mergingShardId =
pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId());
- mergingPipeline->addInitialSource(DocumentSourceMergeCursors::create(
+ cluster_aggregation_planner::addMergeCursorsSource(
+ mergingPipeline.get(),
std::move(dispatchResults.remoteCursors),
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- mergeCtx));
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);
auto mergeResponse =
@@ -980,8 +978,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
namespaces.requestedNss,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
- liteParsedPipeline.hasChangeStream() ? TailableMode::kTailableAndAwaitData
- : TailableMode::kNormal));
+ liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData
+ : TailableModeEnum::kNormal));
}
// First append the properly constructed writeConcernError. It will then be skipped
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index 060c5533877..4d91ddfff08 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -114,7 +114,7 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern);
}
- auto shardResult = std::vector<ClusterClientCursorParams::RemoteCursor>();
+ auto shardResult = std::vector<RemoteCursor>();
auto findCmd = cmdBuilder.obj();
size_t numAttempts = 0;
while (++numAttempts <= kMaxNumStaleVersionRetries) {
@@ -164,13 +164,13 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
invariant(shardResult.size() == 1u);
- auto& cursor = shardResult.front().cursorResponse;
+ auto& cursor = shardResult.front().getCursorResponse();
auto& batch = cursor.getBatch();
// We should have at most 1 result, and the cursor should be exhausted.
uassert(ErrorCodes::InternalError,
str::stream() << "Shard cursor was unexpectedly open after lookup: "
- << shardResult.front().hostAndPort
+ << shardResult.front().getHostAndPort()
<< ", id: "
<< cursor.getCursorId(),
cursor.getCursorId() == 0);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 6e76510032a..769b4968c34 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -586,7 +586,7 @@ DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss
}
uassertStatusOK(statusGetDb);
- boost::optional<long long> batchSize;
+ boost::optional<std::int64_t> batchSize;
if (ntoreturn) {
batchSize = ntoreturn;
}
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index d9148577f5a..aa9a1713255 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -86,6 +86,7 @@ env.Library(
source=[
"async_results_merger.cpp",
"establish_cursors.cpp",
+ env.Idlc('async_results_merger_params.idl')[0],
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/query/command_request_response",
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 59d9a133d72..45403399e8c 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -82,20 +82,27 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa
AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
executor::TaskExecutor* executor,
- ClusterClientCursorParams* params)
+ AsyncResultsMergerParams params)
: _opCtx(opCtx),
_executor(executor),
- _params(params),
- _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) {
+ // This strange initialization is to work around the fact that the IDL does not currently
+ // support a default value for an enum. The default tailable mode should be 'kNormal', but
+ // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
+ _tailableMode(params.getTailableMode() ? *params.getTailableMode()
+ : TailableModeEnum::kNormal),
+ _params(std::move(params)),
+ _mergeQueue(MergingComparator(_remotes,
+ _params.getSort() ? *_params.getSort() : BSONObj(),
+ _params.getCompareWholeSortKey())) {
size_t remoteIndex = 0;
- for (const auto& remote : _params->remotes) {
- _remotes.emplace_back(remote.hostAndPort,
- remote.cursorResponse.getNSS(),
- remote.cursorResponse.getCursorId());
+ for (const auto& remote : _params.getRemotes()) {
+ _remotes.emplace_back(remote.getHostAndPort(),
+ remote.getCursorResponse().getNSS(),
+ remote.getCursorResponse().getCursorId());
// We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
- _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse);
+ _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse());
++remoteIndex;
}
}
@@ -123,7 +130,7 @@ bool AsyncResultsMerger::_remotesExhausted(WithLock) {
Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_params->tailableMode != TailableMode::kTailableAndAwaitData) {
+ if (_tailableMode != TailableModeEnum::kTailableAndAwaitData) {
return Status(ErrorCodes::BadValue,
"maxTimeMS can only be used with getMore for tailable, awaitData cursors");
}
@@ -133,9 +140,9 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
// recent optimes, which allows us to return sorted $changeStream results even if some shards
// are yet to provide a batch of data. If the timeout specified by the client is greater than
// 1000ms, then it will be enforced elsewhere.
- _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u
- ? std::min(awaitDataTimeout, Milliseconds{1000})
- : awaitDataTimeout);
+ _awaitDataTimeout =
+ (_params.getSort() && _remotes.size() > 1u ? std::min(awaitDataTimeout, Milliseconds{1000})
+ : awaitDataTimeout);
return Status::OK();
}
@@ -161,13 +168,12 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
_opCtx = opCtx;
}
-void AsyncResultsMerger::addNewShardCursors(
- const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) {
+void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
for (auto&& remote : newCursors) {
- _remotes.emplace_back(remote.hostAndPort,
- remote.cursorResponse.getNSS(),
- remote.cursorResponse.getCursorId());
+ _remotes.emplace_back(remote.getHostAndPort(),
+ remote.getCursorResponse().getNSS(),
+ remote.getCursorResponse().getCursorId());
}
}
@@ -190,16 +196,15 @@ bool AsyncResultsMerger::_ready(WithLock lk) {
}
}
- const bool hasSort = !_params->sort.isEmpty();
- return hasSort ? _readySorted(lk) : _readyUnsorted(lk);
+ return _params.getSort() ? _readySorted(lk) : _readyUnsorted(lk);
}
bool AsyncResultsMerger::_readySorted(WithLock lk) {
- if (_params->tailableMode == TailableMode::kTailableAndAwaitData) {
+ if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) {
return _readySortedTailable(lk);
}
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_params->tailableMode == TailableMode::kNormal);
+ invariant(_tailableMode == TailableModeEnum::kNormal);
for (const auto& remote : _remotes) {
if (!remote.hasNext() && !remote.exhausted()) {
@@ -218,13 +223,14 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) {
auto smallestRemote = _mergeQueue.top();
auto smallestResult = _remotes[smallestRemote].docBuffer.front();
auto keyWeWantToReturn =
- extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey);
+ extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey());
for (const auto& remote : _remotes) {
if (!remote.promisedMinSortKey) {
// In order to merge sorted tailable cursors, we need this value to be populated.
return false;
}
- if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) {
+ if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) >
+ 0) {
// The key we want to return is not guaranteed to be smaller than future results from
// this remote, so we can't yet return it.
return false;
@@ -264,13 +270,12 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
return {ClusterQueryResult()};
}
- const bool hasSort = !_params->sort.isEmpty();
- return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
+ return _params.getSort() ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
}
ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_params->tailableMode != TailableMode::kTailable);
+ invariant(_tailableMode != TailableModeEnum::kTailable);
if (_mergeQueue.empty()) {
return {};
@@ -304,7 +309,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front();
_remotes[_gettingFromRemote].docBuffer.pop();
- if (_params->tailableMode == TailableMode::kTailable &&
+ if (_tailableMode == TailableModeEnum::kTailable &&
!_remotes[_gettingFromRemote].hasNext()) {
// The cursor is tailable and we're about to return the last buffered result. This
// means that the next value returned should be boost::none to indicate the end of
@@ -334,9 +339,9 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
// request to fetch the remaining docs only. If the remote node has a plan with OR for top k and
// a full sort as is the case for the OP_QUERY find then this optimization will prevent
// switching to the full sort plan branch.
- auto adjustedBatchSize = _params->batchSize;
- if (_params->batchSize && *_params->batchSize > remote.fetchedCount) {
- adjustedBatchSize = *_params->batchSize - remote.fetchedCount;
+ auto adjustedBatchSize = _params.getBatchSize();
+ if (_params.getBatchSize() && *_params.getBatchSize() > remote.fetchedCount) {
+ adjustedBatchSize = *_params.getBatchSize() - remote.fetchedCount;
}
BSONObj cmdObj = GetMoreRequest(remote.cursorNss,
@@ -348,7 +353,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
.toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _opCtx);
+ remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx);
auto callbackStatus =
_executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) {
@@ -448,7 +453,8 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
remote->cursorId = response.getCursorId();
if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) {
// We only expect to see this for change streams.
- invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort ==
+ invariant(_params.getSort());
+ invariant(SimpleBSONObjComparator::kInstance.evaluate(*_params.getSort() ==
change_stream_constants::kSortSpec));
auto newLatestTimestamp = *response.getLastOplogTimestamp();
@@ -475,7 +481,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
auto maxSortKeyFromResponse =
(response.getBatch().empty()
? BSONObj()
- : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey));
+ : extractSortKey(response.getBatch().back(), _params.getCompareWholeSortKey()));
remote->promisedMinSortKey =
(compareSortKeys(
@@ -525,7 +531,7 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t
remote.status = std::move(status);
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
// remove the unreachable host entirely from consideration by marking it as exhausted.
- if (_params->isAllowPartialResults) {
+ if (_params.getAllowPartialResults()) {
remote.status = Status::OK();
// Clear the results buffer and cursor id.
@@ -565,7 +571,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
// through to the client as-is.
// (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from
// one shard means the end of the overall batch).
- if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) {
+ if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) {
invariant(_remotes.size() == 1);
_eofNext = true;
} else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) {
@@ -582,7 +588,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
updateRemoteMetadata(&remote, response);
for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
- if (!_params->sort.isEmpty()) {
+ if (_params.getSort()) {
auto key = obj[AsyncResultsMerger::kSortKeyField];
if (!key) {
remote.status =
@@ -591,7 +597,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
<< "' in document: "
<< obj);
return false;
- } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) {
+ } else if (!_params.getCompareWholeSortKey() && key.type() != BSONType::Object) {
remote.status =
Status(ErrorCodes::InternalError,
str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField
@@ -606,9 +612,9 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
++remote.fetchedCount;
}
- // If we're doing a sorted merge, then we have to make sure to put this remote onto the
- // merge queue.
- if (!_params->sort.isEmpty() && !response.getBatch().empty()) {
+ // If we're doing a sorted merge, then we have to make sure to put this remote onto the merge
+ // queue.
+ if (_params.getSort() && !response.getBatch().empty()) {
_mergeQueue.push(remoteIndex);
}
return true;
@@ -638,10 +644,10 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx)
for (const auto& remote : _remotes) {
if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) {
- BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON();
+ BSONObj cmdObj = KillCursorsRequest(_params.getNss(), {remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx);
+ remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, opCtx);
// Send kill request; discard callback handle, if any, or failure report, if not.
_executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus().ignore();
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index e2551a3c7dd..1653374b5bc 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -37,7 +37,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/cluster_client_cursor_params.h"
+#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/s/query/cluster_query_result.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -98,7 +98,7 @@ public:
*/
AsyncResultsMerger(OperationContext* opCtx,
executor::TaskExecutor* executor,
- ClusterClientCursorParams* params);
+ AsyncResultsMergerParams params);
/**
* In order to be destroyed, either the ARM must have been kill()'ed or all cursors must have
@@ -195,7 +195,11 @@ public:
* Adds the specified shard cursors to the set of cursors to be merged. The results from the
* new cursors will be returned as normal through nextReady().
*/
- void addNewShardCursors(const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors);
+ void addNewShardCursors(std::vector<RemoteCursor>&& newCursors);
+
+ std::size_t getNumRemotes() const {
+ return _remotes.size();
+ }
/**
* Starts shutting down this ARM by canceling all pending requests and scheduling killCursors
@@ -293,7 +297,7 @@ private:
private:
const std::vector<RemoteCursorData>& _remotes;
- const BSONObj& _sort;
+ const BSONObj _sort;
// When '_compareWholeSortKey' is true, $sortKey is a scalar value, rather than an object.
// We extract the sort key {$sortKey: <value>}. The sort key pattern '_sort' is verified to
@@ -401,7 +405,8 @@ private:
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
- ClusterClientCursorParams* _params;
+ TailableModeEnum _tailableMode;
+ AsyncResultsMergerParams _params;
// Must be acquired before accessing any data members (other than _params, which is read-only).
stdx::mutex _mutex;
diff --git a/src/mongo/s/query/async_results_merger_params.idl b/src/mongo/s/query/async_results_merger_params.idl
new file mode 100644
index 00000000000..dafc9b53c1c
--- /dev/null
+++ b/src/mongo/s/query/async_results_merger_params.idl
@@ -0,0 +1,89 @@
+# Copyright (C) 2018 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the GNU Affero General Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+global:
+ cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/s/shard_id.h"
+ - "mongo/util/net/hostandport.h"
+ - "mongo/db/query/cursor_response.h"
+
+imports:
+ - "mongo/db/query/tailable_mode.idl"
+ - "mongo/idl/basic_types.idl"
+ - "mongo/util/net/hostandport.idl"
+
+types:
+ CursorResponse:
+ bson_serialization_type: object
+ description: The first batch returned after establishing cursors on a shard.
+ cpp_type: CursorResponse
+ serializer: CursorResponse::toBSONAsInitialResponse
+ deserializer: CursorResponse::parseFromBSONThrowing
+
+structs:
+ RemoteCursor:
+ description: A description of a cursor opened on a remote server.
+ fields:
+ shardId:
+ type: string
+ description: The shardId of the shard on which the cursor resides.
+ hostAndPort:
+ type: HostAndPort
+ description: The exact host (within the shard) on which the cursor resides.
+ cursorResponse:
+ type: CursorResponse
+ description: The response after establishing a cursor on the remote shard, including
+ the first batch.
+
+ AsyncResultsMergerParams:
+ description: The parameters needed to establish an AsyncResultsMerger.
+ fields:
+ sort:
+ type: object
+ description: The sort requested on the merging operation. Empty if there is no sort.
+ optional: true
+ compareWholeSortKey:
+ type: bool
+ default: false
+ description: >-
+ When 'compareWholeSortKey' is true, $sortKey is a scalar value, rather than an
+ object. We extract the sort key {$sortKey: <value>}. The sort key pattern is
+ verified to be {$sortKey: 1}.
+ remotes: array<RemoteCursor>
+ tailableMode:
+ type: TailableMode
+ optional: true
+ description: If set, the tailability mode of this cursor.
+ batchSize:
+ type: safeInt64
+ optional: true
+ description: The batch size for this cursor.
+ nss: namespacestring
+ allowPartialResults:
+ type: bool
+ default: false
+ description: If set, error responses are ignored.
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 4c0e32cba51..6fd81715e90 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -62,9 +62,11 @@ const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host",
HostAndPort("FakeShard2Host", 12345),
HostAndPort("FakeShard3Host", 12345)};
+const NamespaceString kTestNss("testdb.testcoll");
+
class AsyncResultsMergerTest : public ShardingTestFixture {
public:
- AsyncResultsMergerTest() : _nss("testdb.testcoll") {}
+ AsyncResultsMergerTest() {}
void setUp() override {
ShardingTestFixture::setUp();
@@ -95,42 +97,48 @@ public:
void tearDown() override {
ShardingTestFixture::tearDown();
- // Reset _params only after shutting down the network interface (through
- // ShardingTestFixture::tearDown()), because shutting down the network interface will
- // deliver blackholed responses to the AsyncResultsMerger, and the AsyncResultsMerger's
- // callback may still access _params.
- _params.reset();
}
protected:
/**
* Constructs an ARM with the given vector of existing cursors.
*
- * If 'findCmd' is not set, the default ClusterClientCursorParams are used.
- * Otherwise, the 'findCmd' is used to construct the ClusterClientCursorParams.
+ * If 'findCmd' is not set, the default AsyncResultsMergerParams are used.
+ * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams.
*
* 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
* initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
*/
- void makeCursorFromExistingCursors(
- std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors,
+ std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors(
+ std::vector<RemoteCursor> remoteCursors,
boost::optional<BSONObj> findCmd = boost::none,
- boost::optional<long long> getMoreBatchSize = boost::none) {
- _params = stdx::make_unique<ClusterClientCursorParams>(_nss);
- _params->remotes = std::move(remoteCursors);
+ boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ params.setRemotes(std::move(remoteCursors));
+
if (findCmd) {
const auto qr = unittest::assertGet(
- QueryRequest::makeFromFindCommand(_nss, *findCmd, false /* isExplain */));
- _params->sort = qr->getSort();
- _params->limit = qr->getLimit();
- _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize();
- _params->skip = qr->getSkip();
- _params->tailableMode = qr->getTailableMode();
- _params->isAllowPartialResults = qr->isAllowPartialResults();
+ QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */));
+ if (!qr->getSort().isEmpty()) {
+ params.setSort(qr->getSort().getOwned());
+ }
+
+ if (getMoreBatchSize) {
+ params.setBatchSize(getMoreBatchSize);
+ } else {
+ params.setBatchSize(qr->getBatchSize()
+ ? boost::optional<std::int64_t>(
+ static_cast<std::int64_t>(*qr->getBatchSize()))
+ : boost::none);
+ }
+ params.setTailableMode(qr->getTailableMode());
+ params.setAllowPartialResults(qr->isAllowPartialResults());
}
- arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), _params.get());
+ return stdx::make_unique<AsyncResultsMerger>(
+ operationContext(), executor(), std::move(params));
}
/**
@@ -220,11 +228,6 @@ protected:
net->blackHole(net->getNextReadyRequest());
net->exitNetwork();
}
-
- const NamespaceString _nss;
- std::unique_ptr<ClusterClientCursorParams> _params;
-
- std::unique_ptr<AsyncResultsMerger> arm;
};
void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
@@ -240,10 +243,19 @@ void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
ASSERT_EQ(numCursors, 1u);
}
+RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
+ RemoteCursor remoteCursor;
+ remoteCursor.setShardId(std::move(shardId));
+ remoteCursor.setHostAndPort(std::move(host));
+ remoteCursor.setCursorResponse(std::move(response));
+ return remoteCursor;
+}
+
TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -259,7 +271,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(_nss, CursorId(0), batch);
+ responses.emplace_back(kTestNss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -285,9 +297,10 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -303,7 +316,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")};
- responses.emplace_back(_nss, CursorId(0), batch);
+ responses.emplace_back(kTestNss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -328,10 +341,12 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
}
TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -348,7 +363,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(_nss, CursorId(0), batch1);
+ responses.emplace_back(kTestNss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -376,7 +391,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
responses.clear();
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
+ responses.emplace_back(kTestNss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -392,18 +407,20 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both
- // shards cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both shards
+ // cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -420,7 +437,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"),
fromjson("{$sortKey: {'': 6}}")};
- responses.emplace_back(_nss, CursorId(0), batch1);
+ responses.emplace_back(kTestNss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -434,7 +451,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"),
fromjson("{$sortKey: {'': 9}}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
+ responses.emplace_back(kTestNss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -456,17 +473,19 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 9}}"),
*unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both
- // shards cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both shards
+ // cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -479,7 +498,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(_nss, CursorId(5), batch1);
+ responses.emplace_back(kTestNss, CursorId(5), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -508,7 +527,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
responses.clear();
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
+ responses.emplace_back(kTestNss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -536,7 +555,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
responses.clear();
std::vector<BSONObj> batch3 = {
fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")};
- responses.emplace_back(_nss, CursorId(0), batch3);
+ responses.emplace_back(kTestNss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -552,19 +571,22 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both
- // shards cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both shards
+ // cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
- cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 7, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 7, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
// Schedule requests.
ASSERT_FALSE(arm->ready());
@@ -575,13 +597,13 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5, '': 9}}"),
fromjson("{$sortKey: {'': 4, '': 20}}")};
- responses.emplace_back(_nss, CursorId(0), batch1);
+ responses.emplace_back(kTestNss, CursorId(0), batch1);
std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 10, '': 11}}"),
fromjson("{$sortKey: {'': 4, '': 4}}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
+ responses.emplace_back(kTestNss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"),
fromjson("{$sortKey: {'': 5, '': 9}}")};
- responses.emplace_back(_nss, CursorId(0), batch3);
+ responses.emplace_back(kTestNss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -607,17 +629,18 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"),
*unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both
- // shards cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both shards
+ // cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -626,7 +649,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
// Parsing the batch results in an error because the sort key is missing.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")};
- responses.emplace_back(_nss, CursorId(1), batch1);
+ responses.emplace_back(kTestNss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -644,10 +667,10 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
std::vector<BSONObj> firstBatch = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch)));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch))));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Because there was firstBatch, ARM is immediately ready to return results.
ASSERT_TRUE(arm->ready());
@@ -675,7 +698,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(_nss, CursorId(0), batch);
+ responses.emplace_back(kTestNss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -702,11 +725,12 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
std::vector<BSONObj> firstBatch = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch)));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 0, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch))));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 0, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Because there was firstBatch, ARM is immediately ready to return results.
ASSERT_TRUE(arm->ready());
@@ -734,7 +758,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(_nss, CursorId(0), batch);
+ responses.emplace_back(kTestNss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -759,10 +783,12 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
}
TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -771,9 +797,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// Both shards respond with the first batch.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(1), batch1);
+ responses.emplace_back(kTestNss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(2), batch2);
+ responses.emplace_back(kTestNss, CursorId(2), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -795,7 +821,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// never responds.
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(_nss, CursorId(1), batch3);
+ responses.emplace_back(kTestNss, CursorId(1), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
blackHoleNextRequest();
@@ -813,7 +839,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// We can continue to return results from first shard, while second shard remains unresponsive.
responses.clear();
std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")};
- responses.emplace_back(_nss, CursorId(0), batch4);
+ responses.emplace_back(kTestNss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -828,12 +854,15 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// the network interface.
auto killEvent = arm->kill(operationContext());
ASSERT_TRUE(killEvent.isValid());
+ executor()->shutdown();
+ executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -841,7 +870,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(_nss, CursorId(456), batch);
+ responses.emplace_back(kTestNss, CursorId(456), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -855,22 +884,25 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
}
TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
- cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 789, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 789, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1)
+ BSONObj response1 = CursorResponse(kTestNss, CursorId(123), batch1)
.toBSON(CursorResponse::ResponseType::SubsequentResponse);
BSONObj response2 = fromjson("{foo: 'bar'}");
std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")};
- BSONObj response3 = CursorResponse(_nss, CursorId(789), batch3)
+ BSONObj response3 = CursorResponse(kTestNss, CursorId(789), batch3)
.toBSON(CursorResponse::ResponseType::SubsequentResponse);
scheduleNetworkResponseObjs({response1, response2, response3});
runReadyCallbacks();
@@ -885,11 +917,14 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
}
TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
- cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -897,9 +932,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(1), batch1);
+ responses.emplace_back(kTestNss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(2), batch2);
+ responses.emplace_back(kTestNss, CursorId(2), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -918,9 +953,10 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
}
TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -930,7 +966,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(0), batch);
+ responses.emplace_back(kTestNss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -948,9 +984,10 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
}
TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
executor()->shutdown();
ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus());
@@ -959,9 +996,10 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
}
TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Make a request to the shard that will never get answered.
ASSERT_FALSE(arm->ready());
@@ -979,9 +1017,10 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch
}
TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto killedEvent = arm->kill(operationContext());
@@ -996,11 +1035,14 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
}
TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
- cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1008,11 +1050,11 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(0), batch1);
+ responses.emplace_back(kTestNss, CursorId(0), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
+ responses.emplace_back(kTestNss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(0), batch3);
+ responses.emplace_back(kTestNss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1027,11 +1069,14 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
}
TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
- cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 123, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1039,12 +1084,12 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(0), batch1);
+ responses.emplace_back(kTestNss, CursorId(0), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
+ responses.emplace_back(kTestNss, CursorId(0), batch2);
// Cursor 3 is not exhausted.
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(123), batch3);
+ responses.emplace_back(kTestNss, CursorId(123), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1059,11 +1104,14 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
}
TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
- cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1071,7 +1119,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(0), batch1);
+ responses.emplace_back(kTestNss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1091,9 +1139,10 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
}
TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1101,7 +1150,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(1), batch1);
+ responses.emplace_back(kTestNss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1114,9 +1163,10 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
}
TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
auto killedEvent1 = arm->kill(operationContext());
ASSERT(killedEvent1.isValid());
auto killedEvent2 = arm->kill(operationContext());
@@ -1127,9 +1177,10 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
TEST_F(AsyncResultsMergerTest, TailableBasic) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1137,7 +1188,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(123), batch1);
+ responses.emplace_back(kTestNss, CursorId(123), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1158,7 +1209,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
- responses.emplace_back(_nss, CursorId(123), batch2);
+ responses.emplace_back(kTestNss, CursorId(123), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1176,9 +1227,10 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1187,7 +1239,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
// Remote responds with an empty batch and a non-zero cursor id.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
- responses.emplace_back(_nss, CursorId(123), batch);
+ responses.emplace_back(kTestNss, CursorId(123), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1204,9 +1256,10 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1215,7 +1268,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
// Remote responds with an empty batch and a zero cursor id.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
- responses.emplace_back(_nss, CursorId(0), batch);
+ responses.emplace_back(kTestNss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1229,9 +1282,10 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1239,7 +1293,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(1), batch1);
+ responses.emplace_back(kTestNss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1254,7 +1308,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
+ responses.emplace_back(kTestNss, CursorId(0), batch2);
readyEvent = unittest::assertGet(arm->nextEvent());
BSONObj scheduledCmd = getNthPendingRequest(0).cmdObj;
@@ -1274,11 +1328,14 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 97, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 98, {}));
- cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 99, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 97, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 98, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 99, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1292,9 +1349,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
// remaining shards.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
- responses.emplace_back(_nss, CursorId(98), batch1);
+ responses.emplace_back(kTestNss, CursorId(98), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(99), batch2);
+ responses.emplace_back(kTestNss, CursorId(99), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1315,7 +1372,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
- responses.emplace_back(_nss, CursorId(99), batch3);
+ responses.emplace_back(kTestNss, CursorId(99), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1330,7 +1387,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
// Once the last reachable shard indicates that its cursor is closed, we're done.
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(_nss, CursorId(0), batch4);
+ responses.emplace_back(kTestNss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1341,9 +1398,10 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 98, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 98, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1351,7 +1409,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(98), batch);
+ responses.emplace_back(kTestNss, CursorId(98), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1374,10 +1432,12 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1386,7 +1446,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
// First host returns single result
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
- responses.emplace_back(_nss, CursorId(0), batch);
+ responses.emplace_back(kTestNss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1404,10 +1464,12 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1432,9 +1494,10 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1442,7 +1505,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
- responses.emplace_back(_nss, CursorId(123), batch1);
+ responses.emplace_back(kTestNss, CursorId(123), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1464,7 +1527,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(123), batch2);
+ responses.emplace_back(kTestNss, CursorId(123), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1485,20 +1548,24 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
// Clean up.
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
- responses.emplace_back(_nss, CursorId(0), batch3);
+ responses.emplace_back(kTestNss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) {
- auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
- params->remotes = std::move(cursors);
- params->tailableMode = TailableMode::kTailableAndAwaitData;
- params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
- arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
+ params.setRemotes(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
+ auto arm =
+ stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1510,7 +1577,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
- responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1523,7 +1590,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, "
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
- responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1543,37 +1610,40 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
// Clean up the cursors.
responses.clear();
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(_nss, CursorId(0), batch3);
+ responses.emplace_back(kTestNss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(_nss, CursorId(0), batch4);
+ responses.emplace_back(kTestNss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest,
SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNullOplogTimestamp) {
- auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
CursorResponse(
- _nss,
+ kTestNss,
123,
{fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")},
boost::none,
- Timestamp(1, 5)));
- cursors.emplace_back(kTestShardIds[1],
+ Timestamp(1, 5))));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1],
kTestShardHosts[1],
- CursorResponse(_nss, 456, {}, boost::none, Timestamp()));
- params->remotes = std::move(cursors);
- params->tailableMode = TailableMode::kTailableAndAwaitData;
- params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
- arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
+ CursorResponse(kTestNss, 456, {}, boost::none, Timestamp())));
+ params.setRemotes(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
+ auto arm =
+ stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1581,7 +1651,7 @@ TEST_F(AsyncResultsMergerTest,
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(_nss, CursorId(0), batch3, boost::none, Timestamp(1, 8));
+ responses.emplace_back(kTestNss, CursorId(0), batch3, boost::none, Timestamp(1, 8));
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(unittest::assertGet(arm->nextEvent()));
@@ -1597,31 +1667,34 @@ TEST_F(AsyncResultsMergerTest,
// Clean up.
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(_nss, CursorId(0), batch4);
+ responses.emplace_back(kTestNss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) {
- auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
Timestamp tooLow = Timestamp(1, 2);
- cursors.emplace_back(
+ cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
CursorResponse(
- _nss,
+ kTestNss,
123,
{fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")},
boost::none,
- Timestamp(1, 5)));
- cursors.emplace_back(
- kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}, boost::none, tooLow));
- params->remotes = std::move(cursors);
- params->tailableMode = TailableMode::kTailableAndAwaitData;
- params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
- arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
+ Timestamp(1, 5))));
+ cursors.push_back(makeRemoteCursor(kTestShardIds[1],
+ kTestShardHosts[1],
+ CursorResponse(kTestNss, 456, {}, boost::none, tooLow)));
+ params.setRemotes(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
+ auto arm =
+ stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1629,7 +1702,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp
// Clean up the cursors.
std::vector<CursorResponse> responses;
- responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>{});
+ responses.emplace_back(kTestNss, CursorId(0), std::vector<BSONObj>{});
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
auto killEvent = arm->kill(operationContext());
@@ -1637,13 +1710,16 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) {
- auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- params->remotes = std::move(cursors);
- params->tailableMode = TailableMode::kTailableAndAwaitData;
- params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
- arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ params.setRemotes(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
+ auto arm =
+ stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1655,7 +1731,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
- responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1663,9 +1739,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
ASSERT_TRUE(arm->ready());
// Add the new shard.
- std::vector<ClusterClientCursorParams::RemoteCursor> newCursors;
- newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
- arm->addNewShardCursors(newCursors);
+ std::vector<RemoteCursor> newCursors;
+ newCursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
+ arm->addNewShardCursors(std::move(newCursors));
// Now shouldn't be ready, we don't have a guarantee from each shard.
ASSERT_FALSE(arm->ready());
@@ -1677,7 +1754,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, "
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
- responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1697,24 +1774,27 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
// Clean up the cursors.
responses.clear();
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(_nss, CursorId(0), batch3);
+ responses.emplace_back(kTestNss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(_nss, CursorId(0), batch4);
+ responses.emplace_back(kTestNss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting) {
- auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- params->remotes = std::move(cursors);
- params->tailableMode = TailableMode::kTailableAndAwaitData;
- params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
- arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ params.setRemotes(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
+ auto arm =
+ stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1726,7 +1806,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
- responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1734,9 +1814,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
ASSERT_TRUE(arm->ready());
// Add the new shard.
- std::vector<ClusterClientCursorParams::RemoteCursor> newCursors;
- newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
- arm->addNewShardCursors(newCursors);
+ std::vector<RemoteCursor> newCursors;
+ newCursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
+ arm->addNewShardCursors(std::move(newCursors));
// Now shouldn't be ready, we don't have a guarantee from each shard.
ASSERT_FALSE(arm->ready());
@@ -1750,7 +1831,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
// The last observed time should still be later than the first shard, so we can get the data
// from it.
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
- responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1770,21 +1851,22 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
// Clean up the cursors.
responses.clear();
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(_nss, CursorId(0), batch3);
+ responses.emplace_back(kTestNss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(_nss, CursorId(0), batch4);
+ responses.emplace_back(kTestNss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill(operationContext());
@@ -1793,9 +1875,10 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill(operationContext());
@@ -1804,9 +1887,10 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
- makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1820,9 +1904,10 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
}
TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulingKillCursors) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -1850,9 +1935,10 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin
}
TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -1871,7 +1957,7 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
// exhausted.
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["getMore"]);
- return CursorResponse(_nss, 0LL, {BSON("x" << 1)})
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
.toBSON(CursorResponse::ResponseType::SubsequentResponse);
});
@@ -1879,9 +1965,10 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
}
TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
@@ -1905,9 +1992,10 @@ TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
}
TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) {
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
- cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
- makeCursorFromExistingCursors(std::move(cursors));
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 653599def7e..9c01c013ce6 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -113,7 +113,7 @@ public:
/**
* Returns a reference to the vector of remote hosts involved in this operation.
*/
- virtual const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const = 0;
+ virtual const std::size_t getNumRemotes() const = 0;
/**
* Returns the number of result documents returned so far by this cursor via the next() method.
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 58484e87bfa..1a4be45f1be 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -30,6 +30,7 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
+#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_source_sort.h"
@@ -135,20 +136,19 @@ OperationContext* ClusterClientCursorImpl::getCurrentOperationContext() const {
}
bool ClusterClientCursorImpl::isTailable() const {
- return _params.tailableMode != TailableMode::kNormal;
+ return _params.tailableMode != TailableModeEnum::kNormal;
}
bool ClusterClientCursorImpl::isTailableAndAwaitData() const {
- return _params.tailableMode == TailableMode::kTailableAndAwaitData;
+ return _params.tailableMode == TailableModeEnum::kTailableAndAwaitData;
}
BSONObj ClusterClientCursorImpl::getOriginatingCommand() const {
return _params.originatingCommandObj;
}
-const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorImpl::getRemotes()
- const {
- return _params.remotes;
+const std::size_t ClusterClientCursorImpl::getNumRemotes() const {
+ return _root->getNumRemotes();
}
long long ClusterClientCursorImpl::getNumReturnedSoFar() const {
@@ -181,30 +181,6 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
namespace {
-/**
- * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the
- * sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
- */
-boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) {
- // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort.
- auto frontSort = mergePipeline->popFrontWithCriteria(
- DocumentSourceSort::kStageName, [](const DocumentSource* const source) {
- return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted();
- });
-
- if (frontSort) {
- auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
- if (auto sortLimit = sortStage->getLimitSrc()) {
- // There was a limit stage absorbed into the sort stage, so we need to preserve that.
- mergePipeline->addInitialSource(sortLimit);
- }
- return sortStage
- ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
- .toBson();
- }
- return boost::none;
-}
-
bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
dynamic_cast<DocumentSourceSkip*>(stage.get()));
@@ -250,10 +226,10 @@ std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* execu
// instead.
while (!pipeline->getSources().empty()) {
invariant(isSkipOrLimit(pipeline->getSources().front()));
- if (auto skip = pipeline->popFrontWithCriteria(DocumentSourceSkip::kStageName)) {
+ if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) {
root = stdx::make_unique<RouterStageSkip>(
opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
- } else if (auto limit = pipeline->popFrontWithCriteria(DocumentSourceLimit::kStageName)) {
+ } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) {
root = stdx::make_unique<RouterStageLimit>(
opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
}
@@ -270,7 +246,8 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
const auto skip = params->skip;
const auto limit = params->limit;
if (params->mergePipeline) {
- if (auto sort = extractLeadingSort(params->mergePipeline.get())) {
+ if (auto sort =
+ cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) {
params->sort = *sort;
}
return buildPipelinePlan(executor, params);
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index c685a383307..d3c9349233b 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -105,7 +105,7 @@ public:
BSONObj getOriginatingCommand() const final;
- const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final;
+ const std::size_t getNumRemotes() const final;
long long getNumReturnedSoFar() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 0e3a3fd6731..6e624f36b84 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -68,8 +68,7 @@ BSONObj ClusterClientCursorMock::getOriginatingCommand() const {
return _originatingCommand;
}
-const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorMock::getRemotes()
- const {
+const std::size_t ClusterClientCursorMock::getNumRemotes() const {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 7364240112d..1c50403c3ae 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -69,7 +69,7 @@ public:
BSONObj getOriginatingCommand() const final;
- const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final;
+ const std::size_t getNumRemotes() const final;
long long getNumReturnedSoFar() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 116634bcee8..71a7f17c282 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -42,6 +42,7 @@
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/tailable_mode.h"
#include "mongo/s/client/shard.h"
+#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -61,22 +62,6 @@ class RouterExecStage;
* this cursor have been processed.
*/
struct ClusterClientCursorParams {
- struct RemoteCursor {
- RemoteCursor(ShardId shardId, HostAndPort hostAndPort, CursorResponse cursorResponse)
- : shardId(std::move(shardId)),
- hostAndPort(std::move(hostAndPort)),
- cursorResponse(std::move(cursorResponse)) {}
-
- // The shardId of the shard on which the cursor resides.
- ShardId shardId;
-
- // The exact host (within the shard) on which the cursor resides.
- HostAndPort hostAndPort;
-
- // Encompasses the state of the established cursor.
- CursorResponse cursorResponse;
- };
-
ClusterClientCursorParams(NamespaceString nss,
boost::optional<ReadPreferenceSetting> readPref = boost::none)
: nsString(std::move(nss)) {
@@ -85,6 +70,24 @@ struct ClusterClientCursorParams {
}
}
+ /**
+ * Extracts the subset of fields here needed by the AsyncResultsMerger. The returned
+ * AsyncResultsMergerParams will assume ownership of 'remotes'.
+ */
+ AsyncResultsMergerParams extractARMParams() {
+ AsyncResultsMergerParams armParams;
+ if (!sort.isEmpty()) {
+ armParams.setSort(sort);
+ }
+ armParams.setCompareWholeSortKey(compareWholeSortKey);
+ armParams.setRemotes(std::move(remotes));
+ armParams.setTailableMode(tailableMode);
+ armParams.setBatchSize(batchSize);
+ armParams.setNss(nsString);
+ armParams.setAllowPartialResults(isAllowPartialResults);
+ return armParams;
+ }
+
// Namespace against which the cursors exist.
NamespaceString nsString;
@@ -108,7 +111,7 @@ struct ClusterClientCursorParams {
// The number of results per batch. Optional. If specified, will be specified as the batch for
// each getMore.
- boost::optional<long long> batchSize;
+ boost::optional<std::int64_t> batchSize;
// Limits the number of results returned by the ClusterClientCursor to this many. Optional.
// Should be forwarded to the remote hosts in 'cmdObj'.
@@ -119,7 +122,7 @@ struct ClusterClientCursorParams {
// Whether this cursor is tailing a capped collection, and whether it has the awaitData option
// set.
- TailableMode tailableMode = TailableMode::kNormal;
+ TailableModeEnum tailableMode = TailableModeEnum::kNormal;
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index cd5ce0f9bc8..ca66ee04e08 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -147,10 +147,9 @@ BSONObj ClusterCursorManager::PinnedCursor::getOriginatingCommand() const {
return _cursor->getOriginatingCommand();
}
-const std::vector<ClusterClientCursorParams::RemoteCursor>&
-ClusterCursorManager::PinnedCursor::getRemotes() const {
+const std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const {
invariant(_cursor);
- return _cursor->getRemotes();
+ return _cursor->getNumRemotes();
}
CursorId ClusterCursorManager::PinnedCursor::getCursorId() const {
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index b9b4ee81ef6..cf6dc54f21e 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -198,7 +198,7 @@ public:
/**
* Returns a reference to the vector of remote hosts involved in this operation.
*/
- const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const;
+ const std::size_t getNumRemotes() const;
/**
* Returns the cursor id for the underlying cursor, or zero if no cursor is owned.
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index f8faaabaa96..8c9e654f5bb 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -346,7 +346,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
}
// Fill out query exec properties.
- CurOp::get(opCtx)->debug().nShards = ccc->getRemotes().size();
+ CurOp::get(opCtx)->debug().nShards = ccc->getNumRemotes();
CurOp::get(opCtx)->debug().nreturned = results->size();
// If the cursor is exhausted, then there are no more results to return and we don't need to
@@ -490,7 +490,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
// Set the originatingCommand object and the cursorID in CurOp.
{
- CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getRemotes().size();
+ CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getNumRemotes();
CurOp::get(opCtx)->debug().cursorid = request.cursorid;
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->setOriginatingCommand_inlock(
diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp
index 4e144751dcb..26a944ed5cc 100644
--- a/src/mongo/s/query/document_source_router_adapter.cpp
+++ b/src/mongo/s/query/document_source_router_adapter.cpp
@@ -67,6 +67,10 @@ Value DocumentSourceRouterAdapter::serialize(
return Value(); // Return the empty value to hide this stage from explain output.
}
+std::size_t DocumentSourceRouterAdapter::getNumRemotes() const {
+ return _child->getNumRemotes();
+}
+
bool DocumentSourceRouterAdapter::remotesExhausted() {
return _child->remotesExhausted();
}
diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h
index 5c1a6a0935c..a7db7734539 100644
--- a/src/mongo/s/query/document_source_router_adapter.h
+++ b/src/mongo/s/query/document_source_router_adapter.h
@@ -59,6 +59,7 @@ public:
void detachFromOperationContext() final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
bool remotesExhausted();
+ std::size_t getNumRemotes() const;
void setExecContext(RouterExecStage::ExecContext execContext) {
_execContext = execContext;
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index f186bcb1bb5..08ce5a2cb5b 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -47,13 +47,12 @@
namespace mongo {
-std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors(
- OperationContext* opCtx,
- executor::TaskExecutor* executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults) {
+std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults) {
// Construct the requests
std::vector<AsyncRequestsSender::Request> requests;
for (const auto& remote : remotes) {
@@ -68,20 +67,23 @@ std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors(
readPref,
Shard::RetryPolicy::kIdempotent);
- std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
+ std::vector<RemoteCursor> remoteCursors;
try {
// Get the responses
while (!ars.done()) {
try {
auto response = ars.next();
-
- // uasserts must happen before attempting to access the optional shardHostAndPort.
- auto cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(
+ // Note the shardHostAndPort may not be populated if there was an error, so be sure
+ // to do this after parsing the cursor response to ensure the response was ok.
+ // Additionally, be careful not to push into 'remoteCursors' until we are sure we
+ // have a valid cursor, since the error handling path will attempt to clean up
+ // anything in 'remoteCursors'
+ RemoteCursor cursor;
+ cursor.setCursorResponse(CursorResponse::parseFromBSONThrowing(
uassertStatusOK(std::move(response.swResponse)).data));
-
- remoteCursors.emplace_back(std::move(response.shardId),
- std::move(*response.shardHostAndPort),
- std::move(cursorResponse));
+ cursor.setShardId(std::move(response.shardId));
+ cursor.setHostAndPort(*response.shardHostAndPort);
+ remoteCursors.push_back(std::move(cursor));
} catch (const DBException& ex) {
// Retriable errors are swallowed if 'allowPartialResults' is true.
if (allowPartialResults &&
@@ -114,18 +116,21 @@ std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors(
: response.swResponse.getStatus());
if (swCursorResponse.isOK()) {
- remoteCursors.emplace_back(std::move(response.shardId),
- *response.shardHostAndPort,
- std::move(swCursorResponse.getValue()));
+ RemoteCursor cursor;
+ cursor.setShardId(std::move(response.shardId));
+ cursor.setHostAndPort(*response.shardHostAndPort);
+ cursor.setCursorResponse(std::move(swCursorResponse.getValue()));
+ remoteCursors.push_back(std::move(cursor));
}
}
// Schedule killCursors against all cursors that were established.
for (const auto& remoteCursor : remoteCursors) {
BSONObj cmdObj =
- KillCursorsRequest(nss, {remoteCursor.cursorResponse.getCursorId()}).toBSON();
+ KillCursorsRequest(nss, {remoteCursor.getCursorResponse().getCursorId()})
+ .toBSON();
executor::RemoteCommandRequest request(
- remoteCursor.hostAndPort, nss.db().toString(), cmdObj, opCtx);
+ remoteCursor.getHostAndPort(), nss.db().toString(), cmdObj, opCtx);
// We do not process the response to the killCursors request (we make a good-faith
// attempt at cleaning up the cursors, but ignore any returned errors).
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
index b75a750d7b7..e88ddc2682e 100644
--- a/src/mongo/s/query/establish_cursors.h
+++ b/src/mongo/s/query/establish_cursors.h
@@ -36,7 +36,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/cluster_client_cursor_params.h"
+#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -61,12 +61,11 @@ class CursorResponse;
* on reachable hosts are returned.
*
*/
-std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors(
- OperationContext* opCtx,
- executor::TaskExecutor* executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults);
+std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults);
} // namespace mongo
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 418419fdbef..515da5a358c 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -89,6 +89,14 @@ public:
}
/**
+ * Returns the number of remote hosts involved in this execution plan.
+ */
+ virtual std::size_t getNumRemotes() const {
+ invariant(_child); // The default implementation forwards to the child stage.
+ return _child->getNumRemotes();
+ }
+
+ /**
* Returns whether or not all the remote cursors are exhausted.
*/
virtual bool remotesExhausted() {
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 4f17927483b..48abb1452ec 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -40,18 +40,21 @@ namespace mongo {
RouterStageMerge::RouterStageMerge(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params)
- : RouterExecStage(opCtx), _executor(executor), _params(params), _arm(opCtx, executor, params) {}
+ : RouterExecStage(opCtx),
+ _executor(executor),
+ _params(params),
+ _arm(opCtx, executor, params->extractARMParams()) {}
StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) {
// Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData
// cursors wait for ready() only until a specified time limit is exceeded.
- return (_params->tailableMode == TailableMode::kTailableAndAwaitData
+ return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData
? awaitNextWithTimeout(execCtx)
: _arm.blockingNext());
}
StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) {
- invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData);
+ invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
// If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not
// ready, we don't block. Fall straight through to the return statement.
while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) {
@@ -85,7 +88,7 @@ StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContex
StatusWith<EventHandle> RouterStageMerge::getNextEvent() {
// If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
if (_leftoverEventFromLastTimeout) {
- invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData);
+ invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
auto event = _leftoverEventFromLastTimeout;
_leftoverEventFromLastTimeout = EventHandle();
return event;
@@ -102,14 +105,16 @@ bool RouterStageMerge::remotesExhausted() {
return _arm.remotesExhausted();
}
+std::size_t RouterStageMerge::getNumRemotes() const {
+ return _arm.getNumRemotes();
+}
+
Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm.setAwaitDataTimeout(awaitDataTimeout);
}
-void RouterStageMerge::addNewShardCursors(
- std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards) {
- _arm.addNewShardCursors(newShards);
- std::move(newShards.begin(), newShards.end(), std::back_inserter(_params->remotes));
+void RouterStageMerge::addNewShardCursors(std::vector<RemoteCursor>&& newShards) {
+ _arm.addNewShardCursors(std::move(newShards));
}
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index efd397b8c7e..b6bfee146b6 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -57,10 +57,12 @@ public:
bool remotesExhausted() final;
+ std::size_t getNumRemotes() const final;
+
/**
* Adds the cursors in 'newShards' to those being merged by the ARM.
*/
- void addNewShardCursors(std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards);
+ void addNewShardCursors(std::vector<RemoteCursor>&& newShards);
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 97febc62173..5e94274b9ac 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -84,6 +84,10 @@ void RouterStagePipeline::kill(OperationContext* opCtx) {
_mergePipeline->dispose(opCtx);
}
+std::size_t RouterStagePipeline::getNumRemotes() const {
+ return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes();
+}
+
bool RouterStagePipeline::remotesExhausted() {
return _mongosOnlyPipeline || _routerAdapter->remotesExhausted();
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index e876dc816a2..c14ddf9f80b 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -51,6 +51,8 @@ public:
bool remotesExhausted() final;
+ std::size_t getNumRemotes() const final;
+
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
index 451a1ee9699..61fa2a9176d 100644
--- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
@@ -56,9 +56,11 @@ bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) {
RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params,
+ std::vector<ShardId> shardIds,
BSONObj cmdToRunOnNewShards)
: RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)),
_params(params),
+ _shardIds(std::move(shardIds)),
_cmdToRunOnNewShards(cmdToRunOnNewShards) {}
StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
@@ -73,18 +75,12 @@ StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
}
void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) {
- std::vector<ShardId> existingShardIds;
- for (const auto& remote : _params->remotes) {
- existingShardIds.push_back(remote.shardId);
- }
checked_cast<RouterStageMerge*>(getChildStage())
- ->addNewShardCursors(
- establishShardCursorsOnNewShards(std::move(existingShardIds), newShardDetectedObj));
+ ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
}
-std::vector<ClusterClientCursorParams::RemoteCursor>
-RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds,
- const BSONObj& newShardDetectedObj) {
+std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(
+ const BSONObj& newShardDetectedObj) {
auto* opCtx = getOpCtx();
// Reload the shard registry. We need to ensure a reload initiated after calling this method
// caused the reload, otherwise we aren't guaranteed to get all the new shards.
@@ -98,12 +94,12 @@ RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardI
std::vector<ShardId> shardIds, newShardIds;
shardRegistry->getAllShardIdsNoReload(&shardIds);
- std::sort(existingShardIds.begin(), existingShardIds.end());
+ std::sort(_shardIds.begin(), _shardIds.end());
std::sort(shardIds.begin(), shardIds.end());
std::set_difference(shardIds.begin(),
shardIds.end(),
- existingShardIds.begin(),
- existingShardIds.end(),
+ _shardIds.begin(),
+ _shardIds.end(),
std::back_inserter(newShardIds));
auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
@@ -112,6 +108,7 @@ RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardI
std::vector<std::pair<ShardId, BSONObj>> requests;
for (const auto& shardId : newShardIds) {
requests.emplace_back(shardId, cmdObj);
+ _shardIds.push_back(shardId);
}
const bool allowPartialResults = false; // partial results are not allowed
return establishCursors(opCtx,
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h
index 1128dc83430..00ee921e2af 100644
--- a/src/mongo/s/query/router_stage_update_on_add_shard.h
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.h
@@ -44,6 +44,7 @@ public:
RouterStageUpdateOnAddShard(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params,
+ std::vector<ShardId> shardIds,
BSONObj cmdToRunOnNewShards);
StatusWith<ClusterQueryResult> next(ExecContext) final;
@@ -58,10 +59,10 @@ private:
/**
* Open the cursors on the new shards.
*/
- std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursorsOnNewShards(
- std::vector<ShardId> existingShardIds, const BSONObj& newShardDetectedObj);
+ std::vector<RemoteCursor> establishShardCursorsOnNewShards(const BSONObj& newShardDetectedObj);
ClusterClientCursorParams* _params;
+ std::vector<ShardId> _shardIds;
BSONObj _cmdToRunOnNewShards;
};
}
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index bdf4283c104..9a754364412 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -38,6 +38,7 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/s/shard_id.h"
namespace mongo {
@@ -48,7 +49,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
ClusterCursorManager* cursorManager,
- TailableMode tailableMode) {
+ TailableModeEnum tailableMode) {
if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) {
return cmdResult;
}
@@ -71,11 +72,13 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
}
ClusterClientCursorParams params(incomingCursorResponse.getValue().getNSS());
- params.remotes.emplace_back(shardId,
- server,
- CursorResponse(incomingCursorResponse.getValue().getNSS(),
- incomingCursorResponse.getValue().getCursorId(),
- {}));
+ params.remotes.emplace_back();
+ auto& remoteCursor = params.remotes.back();
+ remoteCursor.setShardId(shardId.toString());
+ remoteCursor.setHostAndPort(server);
+ remoteCursor.setCursorResponse(CursorResponse(incomingCursorResponse.getValue().getNSS(),
+ incomingCursorResponse.getValue().getCursorId(),
+ {}));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.tailableMode = tailableMode;
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index 75a4e76bf24..b9756be44f7 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -74,6 +74,6 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
ClusterCursorManager* cursorManager,
- TailableMode tailableMode = TailableMode::kNormal);
+ TailableModeEnum tailableMode = TailableModeEnum::kNormal);
} // namespace mongo
diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp
index b096affd2cd..27b75dbb06c 100644
--- a/src/mongo/shell/bench.cpp
+++ b/src/mongo/shell/bench.cpp
@@ -214,12 +214,15 @@ int runQueryWithReadCommands(DBClientBase* conn,
}
while (cursorResponse.getCursorId() != 0) {
- GetMoreRequest getMoreRequest(qr->nss(),
- cursorResponse.getCursorId(),
- qr->getBatchSize(),
- boost::none, // maxTimeMS
- boost::none, // term
- boost::none); // lastKnownCommittedOpTime
+ GetMoreRequest getMoreRequest(
+ qr->nss(),
+ cursorResponse.getCursorId(),
+ qr->getBatchSize()
+ ? boost::optional<std::int64_t>(static_cast<std::int64_t>(*qr->getBatchSize()))
+ : boost::none,
+ boost::none, // maxTimeMS
+ boost::none, // term
+ boost::none); // lastKnownCommittedOpTime
BSONObj getMoreCommandResult;
uassert(ErrorCodes::CommandFailed,
str::stream() << "getMore command failed; reply was: " << getMoreCommandResult,
diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript
index b7dd19b3953..133ee826998 100644
--- a/src/mongo/util/net/SConscript
+++ b/src/mongo/util/net/SConscript
@@ -12,6 +12,7 @@ env.Library(
"hostandport.cpp",
"hostname_canonicalization.cpp",
"sockaddr.cpp",
+ env.Idlc('hostandport.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/util/net/hostandport.h b/src/mongo/util/net/hostandport.h
index 540f57f3b46..da0af575bc3 100644
--- a/src/mongo/util/net/hostandport.h
+++ b/src/mongo/util/net/hostandport.h
@@ -54,12 +54,19 @@ class StatusWith;
*/
struct HostAndPort {
/**
- * Parses "text" to produce a HostAndPort. Returns either that or an error
- * status describing the parse failure.
+ * Parses "text" to produce a HostAndPort. Returns either that or an error status describing
+ * the parse failure.
*/
static StatusWith<HostAndPort> parse(StringData text);
/**
+ * A version of 'parse' that throws a UserException if a parsing error is encountered.
+ */
+ static HostAndPort parseThrowing(StringData text) {
+ return uassertStatusOK(parse(text));
+ }
+
+ /**
* Construct an empty/invalid HostAndPort.
*/
HostAndPort();
diff --git a/src/mongo/util/net/hostandport.idl b/src/mongo/util/net/hostandport.idl
new file mode 100644
index 00000000000..6edda963f6e
--- /dev/null
+++ b/src/mongo/util/net/hostandport.idl
@@ -0,0 +1,38 @@
+# Copyright (C) 2018 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the GNU Affero General Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+global:
+ cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/util/net/hostandport.h"
+types:
+ HostAndPort:
+ bson_serialization_type: string
+ description: A string representing a host and port of a mongod or mongos process.
+ cpp_type: HostAndPort
+ serializer: HostAndPort::toString
+ deserializer: HostAndPort::parseThrowing
+