summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/auth/SConscript2
-rw-r--r--src/mongo/db/pipeline/SConscript252
-rw-r--r--src/mongo/db/pipeline/change_stream_constants.h41
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h3
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp293
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h113
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors_test.cpp488
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp64
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h3
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp2
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp71
-rw-r--r--src/mongo/s/commands/cluster_aggregate.h3
-rw-r--r--src/mongo/s/query/SConscript10
-rw-r--r--src/mongo/s/query/async_results_merger.cpp60
-rw-r--r--src/mongo/s/query/async_results_merger.h18
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp68
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp8
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.cpp38
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h7
-rw-r--r--src/mongo/s/query/cluster_find.cpp12
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp34
-rw-r--r--src/mongo/s/query/router_stage_merge.h6
25 files changed, 1046 insertions, 560 deletions
diff --git a/src/mongo/db/auth/SConscript b/src/mongo/db/auth/SConscript
index d36b1854c14..6ea06b5dddd 100644
--- a/src/mongo/db/auth/SConscript
+++ b/src/mongo/db/auth/SConscript
@@ -263,7 +263,7 @@ env.CppUnitTest(
'authmocks',
'saslauth',
'authorization_session_for_test',
- '$BUILD_DIR/mongo/db/pipeline/document_source',
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
'$BUILD_DIR/mongo/transport/transport_layer_mock',
]
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 680094f0971..fcbb40e3199 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -11,9 +11,6 @@ env.Library(
],
LIBDEPS=[
'aggregation_request',
- 'document_source',
- 'document_source_facet',
- 'document_source_lookup',
'expression_context',
'pipeline',
]
@@ -123,50 +120,6 @@ env.Library(
]
)
-env.CppUnitTest(
- target='document_source_test',
- source=[
- 'document_source_add_fields_test.cpp',
- 'document_source_bucket_auto_test.cpp',
- 'document_source_bucket_test.cpp',
- 'document_source_change_stream_test.cpp',
- 'document_source_check_resume_token_test.cpp',
- 'document_source_count_test.cpp',
- 'document_source_current_op_test.cpp',
- 'document_source_geo_near_test.cpp',
- 'document_source_group_test.cpp',
- 'document_source_limit_test.cpp',
- 'document_source_lookup_change_post_image_test.cpp',
- 'document_source_lookup_test.cpp',
- 'document_source_graph_lookup_test.cpp',
- 'document_source_match_test.cpp',
- 'document_source_mock_test.cpp',
- 'document_source_project_test.cpp',
- 'document_source_redact_test.cpp',
- 'document_source_replace_root_test.cpp',
- 'document_source_sample_test.cpp',
- 'document_source_skip_test.cpp',
- 'document_source_sort_by_count_test.cpp',
- 'document_source_sort_test.cpp',
- 'document_source_test.cpp',
- 'document_source_unwind_test.cpp',
- 'sequential_document_cache_test.cpp',
- ],
- LIBDEPS=[
- 'document_source',
- 'document_source_facet',
- 'document_source_lookup',
- 'document_source_mock',
- 'document_value_test_util',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/repl/oplog_entry',
- '$BUILD_DIR/mongo/db/repl/replmocks',
- '$BUILD_DIR/mongo/db/service_context',
- '$BUILD_DIR/mongo/s/is_mongos',
- '$BUILD_DIR/mongo/util/clock_source_mock',
- ],
-)
-
env.Library(
target='dependencies',
source=[
@@ -229,19 +182,117 @@ env.Library(
]
)
-docSourceEnv = env.Clone()
-docSourceEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
-docSourceEnv.Library(
- target='document_source',
+env.Library(
+ target='document_source_mock',
+ source=[
+ 'document_source_mock.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/query/query_test_service_context',
+ 'pipeline',
+ ]
+)
+
+env.CppUnitTest(
+ target='document_source_test',
+ source=[
+ 'document_source_add_fields_test.cpp',
+ 'document_source_bucket_auto_test.cpp',
+ 'document_source_bucket_test.cpp',
+ 'document_source_change_stream_test.cpp',
+ 'document_source_check_resume_token_test.cpp',
+ 'document_source_count_test.cpp',
+ 'document_source_current_op_test.cpp',
+ 'document_source_geo_near_test.cpp',
+ 'document_source_graph_lookup_test.cpp',
+ 'document_source_group_test.cpp',
+ 'document_source_limit_test.cpp',
+ 'document_source_lookup_change_post_image_test.cpp',
+ 'document_source_lookup_test.cpp',
+ 'document_source_match_test.cpp',
+ 'document_source_mock_test.cpp',
+ 'document_source_project_test.cpp',
+ 'document_source_redact_test.cpp',
+ 'document_source_replace_root_test.cpp',
+ 'document_source_sample_test.cpp',
+ 'document_source_skip_test.cpp',
+ 'document_source_sort_by_count_test.cpp',
+ 'document_source_sort_test.cpp',
+ 'document_source_test.cpp',
+ 'document_source_unwind_test.cpp',
+ 'sequential_document_cache_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/repl/replmocks',
+ '$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/util/clock_source_mock',
+ 'document_source_mock',
+ 'document_value_test_util',
+ 'pipeline',
+ ],
+)
+
+# This test depends on the sharding test fixture, which has global initializers that conflict with
+# the ones set in 'document_source_test', so is split into its own test.
+env.CppUnitTest(
+ target='document_source_merge_cursors_test',
+ source=[
+ 'document_source_merge_cursors_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/query/query_request',
+ '$BUILD_DIR/mongo/db/query/query_test_service_context',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/s/sharding_test_fixture',
+ 'pipeline',
+ 'document_value_test_util',
+ ],
+)
+
+env.CppUnitTest(
+ target='document_source_facet_test',
+ source='document_source_facet_test.cpp',
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/s/is_mongos',
+ 'pipeline',
+ 'document_source_mock',
+ 'document_value_test_util',
+ ],
+)
+
+env.Library(
+ target='lite_parsed_document_source',
+ source=[
+ 'lite_parsed_document_source.cpp',
+ ],
+ LIBDEPS=[
+ 'aggregation_request',
+ ]
+)
+
+pipelineeEnv = env.Clone()
+pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
+pipelineeEnv.Library(
+ target='pipeline',
source=[
'document_source.cpp',
'document_source_add_fields.cpp',
'document_source_bucket.cpp',
'document_source_bucket_auto.cpp',
+ 'document_source_change_stream.cpp',
+ 'document_source_check_resume_token.cpp',
'document_source_coll_stats.cpp',
'document_source_count.cpp',
'document_source_current_op.cpp',
+ 'document_source_facet.cpp',
'document_source_geo_near.cpp',
+ 'document_source_graph_lookup.cpp',
'document_source_group.cpp',
'document_source_index_stats.cpp',
'document_source_internal_inhibit_optimization.cpp',
@@ -250,6 +301,8 @@ docSourceEnv.Library(
'document_source_list_local_cursors.cpp',
'document_source_list_local_sessions.cpp',
'document_source_list_sessions.cpp',
+ 'document_source_lookup.cpp',
+ 'document_source_lookup_change_post_image.cpp',
'document_source_match.cpp',
'document_source_merge_cursors.cpp',
'document_source_out.cpp',
@@ -263,11 +316,15 @@ docSourceEnv.Library(
'document_source_skip.cpp',
'document_source_sort.cpp',
'document_source_sort_by_count.cpp',
+ 'document_source_tee_consumer.cpp',
'document_source_unwind.cpp',
+ 'pipeline.cpp',
'sequential_document_cache.cpp',
- ],
+ 'tee_buffer.cpp',
+ ],
LIBDEPS=[
'$BUILD_DIR/mongo/client/clientdriver',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
'$BUILD_DIR/mongo/db/generic_cursor',
'$BUILD_DIR/mongo/db/index/key_generator',
@@ -275,7 +332,10 @@ docSourceEnv.Library(
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
'$BUILD_DIR/mongo/db/matcher/expressions',
'$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source',
+ '$BUILD_DIR/mongo/db/query/collation/collator_factory_interface',
+ '$BUILD_DIR/mongo/db/query/collation/collator_interface',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/repl/read_concern_args',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/sessions_collection',
@@ -283,100 +343,19 @@ docSourceEnv.Library(
'$BUILD_DIR/mongo/db/storage/encryption_hooks',
'$BUILD_DIR/mongo/db/storage/storage_options',
'$BUILD_DIR/mongo/s/is_mongos',
+ '$BUILD_DIR/mongo/s/query/async_results_merger',
'$BUILD_DIR/third_party/shim_snappy',
'accumulator',
'dependencies',
'document_sources_idl',
'document_value',
'expression',
+ 'expression_context',
'granularity_rounder',
'parsed_aggregation_projection',
- ],
-)
-
-env.Library(
- target='document_source_mock',
- source=[
- 'document_source_mock.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/query/query_test_service_context',
- 'document_source',
- ]
-)
-
-env.Library(
- target='lite_parsed_document_source',
- source=[
- 'lite_parsed_document_source.cpp',
- ],
- LIBDEPS=[
- 'aggregation_request',
]
)
-env.Library(
- target='pipeline',
- source=[
- 'pipeline.cpp',
- ],
- LIBDEPS=[
- 'dependencies',
- 'document_source',
- 'document_value',
- 'expression_context',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
- '$BUILD_DIR/mongo/db/bson/dotted_path_support',
- '$BUILD_DIR/mongo/db/query/collation/collator_interface',
- '$BUILD_DIR/mongo/db/query/collation/collator_factory_interface',
- '$BUILD_DIR/mongo/db/repl/read_concern_args',
- '$BUILD_DIR/mongo/db/storage/storage_options',
- ]
-)
-
-env.Library(
- target='document_source_lookup',
- source=[
- 'document_source_change_stream.cpp',
- 'document_source_check_resume_token.cpp',
- 'document_source_graph_lookup.cpp',
- 'document_source_lookup.cpp',
- 'document_source_lookup_change_post_image.cpp',
- ],
- LIBDEPS=[
- 'document_source',
- 'pipeline',
- '$BUILD_DIR/mongo/db/catalog/uuid_catalog',
- '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
- ],
-)
-
-env.Library(
- target='document_source_facet',
- source=[
- 'document_source_facet.cpp',
- 'tee_buffer.cpp',
- 'document_source_tee_consumer.cpp',
- ],
- LIBDEPS=[
- 'document_source',
- 'pipeline',
- ]
-)
-
-env.CppUnitTest(
- target='document_source_facet_test',
- source='document_source_facet_test.cpp',
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
- '$BUILD_DIR/mongo/s/is_mongos',
- 'document_source_facet',
- 'document_source_mock',
- 'document_value_test_util',
- ],
-)
-
env.CppUnitTest(
target='tee_buffer_test',
source='tee_buffer_test.cpp',
@@ -384,7 +363,6 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/s/is_mongos',
- 'document_source_facet',
'document_source_mock',
'document_value_test_util',
],
@@ -430,7 +408,6 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/s/is_mongos',
'document_value_test_util',
- 'document_source_lookup',
'document_source_mock',
'pipeline',
],
@@ -537,6 +514,5 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'document_sources_idl',
- 'document_source_lookup',
],
)
diff --git a/src/mongo/db/pipeline/change_stream_constants.h b/src/mongo/db/pipeline/change_stream_constants.h
new file mode 100644
index 00000000000..fe043d17572
--- /dev/null
+++ b/src/mongo/db/pipeline/change_stream_constants.h
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+
+namespace mongo {
+namespace change_stream_constants {
+
+const BSONObj kSortSpec =
+ BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1);
+
+} // namespace change_stream_constants
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 95032ec3cae..6f4ac3ffc57 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/bson/bson_helper.h"
#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/logical_clock.h"
+#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
@@ -81,9 +82,6 @@ constexpr StringData DocumentSourceChangeStream::kInsertOpType;
constexpr StringData DocumentSourceChangeStream::kInvalidateOpType;
constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType;
-const BSONObj DocumentSourceChangeStream::kSortSpec =
- BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1);
-
namespace {
@@ -186,7 +184,7 @@ public:
const long long noLimit = -1;
auto sortMergingPresorted =
DocumentSourceSort::create(pExpCtx,
- DocumentSourceChangeStream::kSortSpec,
+ change_stream_constants::kSortSpec,
noLimit,
DocumentSourceSort::kMaxMemoryUsageBytes,
mergingPresorted);
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 17f891c7dc7..87bb4b63a48 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -107,9 +107,6 @@ public:
bool _documentKeyFieldsSharded = false;
};
- // The sort pattern used to merge results from multiple change streams on a mongos.
- static const BSONObj kSortSpec;
-
// The name of the field where the document key (_id and shard key, if present) will be found
// after the transformation.
static constexpr StringData kDocumentKeyField = "documentKey"_sd;
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 1f0244d4127..a0d5b643d96 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -28,6 +28,7 @@
#pragma once
+#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_sort.h"
@@ -122,7 +123,7 @@ public:
const long long noLimit = -1;
auto sortMergingPresorted =
DocumentSourceSort::create(pExpCtx,
- DocumentSourceChangeStream::kSortSpec,
+ change_stream_constants::kSortSpec,
noLimit,
DocumentSourceSort::kMaxMemoryUsageBytes,
mergingPresorted);
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index 539e15c0ccc..9eeb06e7be3 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright 2013 (c) 10gen Inc.
+ * Copyright (C) 2018 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
@@ -17,179 +17,196 @@
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
-
-#include "mongo/db/pipeline/lite_parsed_document_source.h"
-#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/grid.h"
namespace mongo {
-using boost::intrusive_ptr;
-using std::make_pair;
-using std::string;
-using std::vector;
-
-constexpr StringData DocumentSourceMergeCursors::kStageName;
-
-DocumentSourceMergeCursors::DocumentSourceMergeCursors(
- std::vector<CursorDescriptor> cursorDescriptors,
- const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx), _cursorDescriptors(std::move(cursorDescriptors)), _unstarted(true) {}
-
REGISTER_DOCUMENT_SOURCE(mergeCursors,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceMergeCursors::createFromBson);
-intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
- std::vector<CursorDescriptor> cursorDescriptors,
- const intrusive_ptr<ExpressionContext>& pExpCtx) {
- intrusive_ptr<DocumentSourceMergeCursors> source(
- new DocumentSourceMergeCursors(std::move(cursorDescriptors), pExpCtx));
- return source;
-}
-
-intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
- BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
- massert(17026,
- string("Expected an Array, but got a ") + typeName(elem.type()),
- elem.type() == Array);
-
- std::vector<CursorDescriptor> cursorDescriptors;
- BSONObj array = elem.embeddedObject();
- BSONForEach(cursor, array) {
- massert(17027,
- string("Expected an Object, but got a ") + typeName(cursor.type()),
- cursor.type() == Object);
-
- // The cursor descriptors for the merge cursors stage used to lack an "ns" field; "ns" was
- // understood to be the expression context namespace in that case. For mixed-version
- // compatibility, we accept both the old and new formats here.
- std::string cursorNs = cursor["ns"] ? cursor["ns"].String() : pExpCtx->ns.ns();
-
- cursorDescriptors.emplace_back(ConnectionString(HostAndPort(cursor["host"].String())),
- std::move(cursorNs),
- cursor["id"].Long());
- }
+constexpr StringData DocumentSourceMergeCursors::kStageName;
- return new DocumentSourceMergeCursors(std::move(cursorDescriptors), pExpCtx);
-}
+DocumentSourceMergeCursors::DocumentSourceMergeCursors(
+ executor::TaskExecutor* executor,
+ std::unique_ptr<ClusterClientCursorParams> cursorParams,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx), _executor(executor), _armParams(std::move(cursorParams)) {}
-Value DocumentSourceMergeCursors::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- vector<Value> cursors;
- for (size_t i = 0; i < _cursorDescriptors.size(); i++) {
- cursors.push_back(
- Value(DOC("host" << Value(_cursorDescriptors[i].connectionString.toString()) << "ns"
- << _cursorDescriptors[i].ns
- << "id"
- << _cursorDescriptors[i].cursorId)));
+DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
+ // We don't expect or support tailable cursors to be executing through this stage.
+ invariant(pExpCtx->tailableMode == TailableMode::kNormal);
+ if (!_arm) {
+ _arm.emplace(pExpCtx->opCtx, _executor, _armParams.get());
}
- return Value(DOC(kStageName << Value(cursors)));
+ auto next = uassertStatusOK(_arm->blockingNext());
+ if (next.isEOF()) {
+ return GetNextResult::makeEOF();
+ }
+ return Document::fromBsonWithMetaData(*next.getResult());
}
-DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection(
- const CursorDescriptor& cursorDescriptor)
- : connection(cursorDescriptor.connectionString),
- cursor(connection.get(), cursorDescriptor.ns, cursorDescriptor.cursorId, 0, 0) {}
-
-vector<DBClientCursor*> DocumentSourceMergeCursors::getCursors() {
- verify(_unstarted);
- start();
- vector<DBClientCursor*> out;
- for (Cursors::const_iterator it = _cursors.begin(); it != _cursors.end(); ++it) {
- out.push_back(&((*it)->cursor));
+void DocumentSourceMergeCursors::serializeToArray(
+ std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
+ invariant(!_arm);
+ invariant(_armParams);
+ std::vector<Value> cursors;
+ for (auto&& remote : _armParams->remotes) {
+ cursors.emplace_back(Document{{"host", remote.hostAndPort.toString()},
+ {"ns", remote.cursorResponse.getNSS().toString()},
+ {"id", remote.cursorResponse.getCursorId()}});
+ }
+ array.push_back(Value(Document{{kStageName, Value(std::move(cursors))}}));
+ if (!_armParams->sort.isEmpty()) {
+ array.push_back(Value(Document{{DocumentSourceSort::kStageName, Value(_armParams->sort)}}));
}
-
- return out;
}
-void DocumentSourceMergeCursors::start() {
- _unstarted = false;
+Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+ invariant(!_arm);
+ invariant(_armParams);
- // open each cursor and send message asking for a batch
- for (auto&& cursorDescriptor : _cursorDescriptors) {
- _cursors.push_back(std::make_shared<CursorAndConnection>(cursorDescriptor));
- verify(_cursors.back()->connection->lazySupported());
- _cursors.back()->cursor.initLazy(); // shouldn't block
+ auto next = std::next(itr);
+ if (next == container->end()) {
+ return next;
}
- // wait for all cursors to return a batch
- // TODO need a way to keep cursors alive if some take longer than 10 minutes.
- for (auto&& cursor : _cursors) {
- bool retry = false;
- bool ok = cursor->cursor.initLazyFinish(retry); // blocks here for first batch
-
- uassert(
- 17028, "error reading response from " + _cursors.back()->connection->toString(), ok);
- verify(!retry);
+ auto nextSort = dynamic_cast<DocumentSourceSort*>(next->get());
+ if (!nextSort || !nextSort->mergingPresorted()) {
+ return next;
}
- _currentCursor = _cursors.begin();
-}
-
-Document DocumentSourceMergeCursors::nextSafeFrom(DBClientCursor* cursor) {
- const BSONObj next = cursor->next();
- if (next.hasField("$err")) {
- uassertStatusOKWithContext(getStatusFromCommandResult(next),
- str::stream() << "Received error in response from "
- << cursor->originalHost());
+ _armParams->sort =
+ nextSort->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
+ .toBson();
+ if (auto sortLimit = nextSort->getLimitSrc()) {
+ // There was a limit stage absorbed into the sort stage, so we need to preserve that.
+ container->insert(std::next(next), sortLimit);
}
- return Document::fromBsonWithMetaData(next);
+ container->erase(next);
+ return std::next(itr);
}
-DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
- pExpCtx->checkForInterrupt();
-
- if (_unstarted)
- start();
+boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(
+ 17026, "$mergeCursors stage expected array as argument", elem.type() == BSONType::Array);
+ const auto serializedRemotes = elem.Array();
+ uassert(50729,
+ "$mergeCursors stage expected array with at least one entry",
+ serializedRemotes.size() > 0);
+
+ boost::optional<NamespaceString> nss;
+ std::vector<ClusterClientCursorParams::RemoteCursor> remotes;
+ for (auto&& cursor : serializedRemotes) {
+ BSONElement nsElem;
+ BSONElement hostElem;
+ BSONElement idElem;
+ uassert(17027,
+ "$mergeCursors stage requires each cursor in array to be an object",
+ cursor.type() == BSONType::Object);
+ for (auto&& cursorElem : cursor.Obj()) {
+ const auto fieldName = cursorElem.fieldNameStringData();
+ if (fieldName == "ns"_sd) {
+ nsElem = cursorElem;
+ } else if (fieldName == "host"_sd) {
+ hostElem = cursorElem;
+ } else if (fieldName == "id"_sd) {
+ idElem = cursorElem;
+ } else {
+ uasserted(50730,
+ str::stream() << "Unrecognized option " << fieldName
+ << " within cursor provided to $mergeCursors: "
+ << cursor);
+ }
+ }
+ uassert(
+ 50731,
+ "$mergeCursors stage requires \'ns\' field with type string for each cursor in array",
+ nsElem.type() == BSONType::String);
+
+ // We require each cursor to have the same namespace. This isn't a fundamental limit of the
+ // system, but needs to be true due to the implementation of AsyncResultsMerger, which
+ // tracks one namespace for all cursors.
+ uassert(50720,
+ "$mergeCursors requires each cursor to have the same namespace",
+ !nss || nss->ns() == nsElem.valueStringData());
+ nss = NamespaceString(nsElem.String());
- // purge eof cursors and release their connections
- while (!_cursors.empty() && !(*_currentCursor)->cursor.more()) {
- (*_currentCursor)->connection.done();
- _cursors.erase(_currentCursor);
- _currentCursor = _cursors.begin();
+ uassert(
+ 50721,
+ "$mergeCursors stage requires \'host\' field with type string for each cursor in array",
+ hostElem.type() == BSONType::String);
+ auto host = uassertStatusOK(HostAndPort::parse(hostElem.valueStringData()));
+
+ uassert(50722,
+ "$mergeCursors stage requires \'id\' field with type long for each cursor in array",
+ idElem.type() == BSONType::NumberLong);
+ auto cursorId = idElem.Long();
+
+ // We are assuming that none of the cursors have been iterated at all, and so will not have
+ // any data in the initial batch.
+ // TODO SERVER-33323 We use a fake shard id because the AsyncResultsMerger won't use it for
+ // anything, and finding the real one is non-trivial.
+ std::vector<BSONObj> emptyBatch;
+ remotes.push_back({ShardId("fakeShardIdForMergeCursors"),
+ std::move(host),
+ CursorResponse{*nss, cursorId, emptyBatch}});
}
+ invariant(nss); // We know there is at least one cursor in 'serializedRemotes', and we require
+ // each cursor to have a 'ns' field.
+
+ auto params = stdx::make_unique<ClusterClientCursorParams>(*nss);
+ params->remotes = std::move(remotes);
+ params->readPreference = ReadPreferenceSetting::get(expCtx->opCtx);
+ return new DocumentSourceMergeCursors(
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ std::move(params),
+ expCtx);
+}
- if (_cursors.empty())
- return GetNextResult::makeEOF();
-
- auto next = nextSafeFrom(&((*_currentCursor)->cursor));
-
- // advance _currentCursor, wrapping if needed
- if (++_currentCursor == _cursors.end())
- _currentCursor = _cursors.begin();
+boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
+ std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors,
+ executor::TaskExecutor* executor,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ auto params = stdx::make_unique<ClusterClientCursorParams>(expCtx->ns);
+ params->remotes = std::move(remoteCursors);
+ params->readPreference = ReadPreferenceSetting::get(expCtx->opCtx);
+ return new DocumentSourceMergeCursors(executor, std::move(params), expCtx);
+}
- return std::move(next);
+void DocumentSourceMergeCursors::detachFromOperationContext() {
+ if (_arm) {
+ _arm->detachFromOperationContext();
+ }
}
-bool DocumentSourceMergeCursors::remotesExhausted() const {
- return std::all_of(_cursors.begin(), _cursors.end(), [](const auto& cursorAndConn) {
- return cursorAndConn->cursor.isDead();
- });
+void DocumentSourceMergeCursors::reattachToOperationContext(OperationContext* opCtx) {
+ if (_arm) {
+ _arm->reattachToOperationContext(opCtx);
+ }
}
void DocumentSourceMergeCursors::doDispose() {
- for (auto&& cursorAndConn : _cursors) {
- // Note it is an error to call done() on a connection before consuming the reply from a
- // request.
- if (cursorAndConn->cursor.connectionHasPendingReplies()) {
- continue;
- }
- cursorAndConn->cursor.kill();
- cursorAndConn->connection.done();
+ if (_arm) {
+ _arm->blockingKill(pExpCtx->opCtx);
}
- _cursors.clear();
- _currentCursor = _cursors.end();
-}
}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index 53ef546d5ea..e23772f411b 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016 MongoDB Inc.
+ * Copyright (C) 2018 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
@@ -28,35 +28,58 @@
#pragma once
-#include "mongo/client/connpool.h"
-#include "mongo/db/clientcursor.h"
-#include "mongo/db/cursor_id.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/query/async_results_merger.h"
namespace mongo {
+/**
+ * A stage used only internally to merge results that are being gathered from remote hosts, possibly
+ * including this host.
+ *
+ * Does not assume ownership of cursors until the first call to getNext(). This is to allow this
+ * stage to be used on mongos without actually iterating the cursors. For example, when this stage
+ * is parsed on mongos it may later be decided that the merging should happen on one of the shards.
+ * Then this stage is forwarded to the merging shard, and it should not kill the cursors when it
+ * goes out of scope on mongos.
+ */
class DocumentSourceMergeCursors : public DocumentSource {
public:
static constexpr StringData kStageName = "$mergeCursors"_sd;
- struct CursorDescriptor {
- CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId)
- : connectionString(std::move(connectionString)),
- ns(std::move(ns)),
- cursorId(cursorId) {}
-
- ConnectionString connectionString;
- std::string ns;
- CursorId cursorId;
- };
+ /**
+ * Parses a serialized version of this stage.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement, const boost::intrusive_ptr<ExpressionContext>&);
- GetNextResult getNext() final;
+ /**
+ * Creates a new DocumentSourceMergeCursors from the given 'remoteCursors'.
+ */
+ static boost::intrusive_ptr<DocumentSource> create(
+ std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors,
+ executor::TaskExecutor*,
+ const boost::intrusive_ptr<ExpressionContext>&);
const char* getSourceName() const final {
return kStageName.rawData();
}
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+ /**
+ * Absorbs a subsequent $sort if it's merging pre-sorted streams.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container);
+ void detachFromOperationContext() final;
+ void reattachToOperationContext(OperationContext*) final;
+
+ /**
+ * Serializes this stage to be sent to perform the merging on a different host.
+ */
+ void serializeToArray(
+ std::vector<Value>& array,
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
StageConstraints constraints(StreamType::kStreaming,
@@ -69,57 +92,29 @@ public:
return constraints;
}
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- static boost::intrusive_ptr<DocumentSource> create(
- std::vector<CursorDescriptor> cursorDescriptors,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- /**
- * Returns true if all remotes have reported that their cursors are closed.
- */
- bool remotesExhausted() const;
-
- /** Returns non-owning pointers to cursors managed by this stage.
- * Call this instead of getNext() if you want access to the raw streams.
- * This method should only be called at most once.
- */
- std::vector<DBClientCursor*> getCursors();
-
- /**
- * Returns the next object from the cursor, throwing an appropriate exception if the cursor
- * reported an error. This is a better form of DBClientCursor::nextSafe.
- */
- static Document nextSafeFrom(DBClientCursor* cursor);
+ GetNextResult getNext() final;
protected:
void doDispose() final;
private:
- struct CursorAndConnection {
- CursorAndConnection(const CursorDescriptor& cursorDescriptor);
- ScopedDbConnection connection;
- DBClientCursor cursor;
- };
+ DocumentSourceMergeCursors(executor::TaskExecutor*,
+ std::unique_ptr<ClusterClientCursorParams>,
+ const boost::intrusive_ptr<ExpressionContext>&);
- // using list to enable removing arbitrary elements
- typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors;
-
- DocumentSourceMergeCursors(std::vector<CursorDescriptor> cursorDescriptors,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- // Converts _cursorDescriptors into active _cursors.
- void start();
-
- // This is the description of cursors to merge.
- const std::vector<CursorDescriptor> _cursorDescriptors;
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ MONGO_UNREACHABLE; // Should call serializeToArray instead.
+ }
- // These are the actual cursors we are merging. Created lazily.
- Cursors _cursors;
- Cursors::iterator _currentCursor;
+ executor::TaskExecutor* _executor;
- bool _unstarted;
+ // '_arm' is not populated until the first call to getNext(). If getNext() is never called we
+ // will not create an AsyncResultsMerger. If we did so the destruction of this stage would cause
+ // the cursors within the ARM to be killed prematurely. For example, if this stage is parsed on
+ // mongos then forwarded to the shards, it should not kill the cursors when it goes out of scope
+ // on mongos.
+ std::unique_ptr<ClusterClientCursorParams> _armParams;
+ boost::optional<AsyncResultsMerger> _arm;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
new file mode 100644
index 00000000000..8007aea0db0
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
@@ -0,0 +1,488 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
+
+#include "mongo/client/remote_command_targeter_factory_mock.h"
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/json.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/expression_context_for_test.h"
+#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/query/query_request.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/sharding_test_fixture.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using executor::NetworkInterfaceMock;
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+
+using ResponseStatus = executor::TaskExecutor::ResponseStatus;
+
+const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
+const std::vector<ShardId> kTestShardIds = {
+ ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
+const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345),
+ HostAndPort("FakeShard2Host", 12345),
+ HostAndPort("FakeShard3Host", 12345)};
+
+const NamespaceString kTestNss = NamespaceString("test.mergeCursors"_sd);
+const HostAndPort kTestHost = HostAndPort("localhost:27017"_sd);
+
+const CursorId kExhaustedCursorID = 0;
+
+class DocumentSourceMergeCursorsTest : public ShardingTestFixture {
+public:
+ DocumentSourceMergeCursorsTest() : _expCtx(new ExpressionContextForTest(kTestNss)) {}
+
+ void setUp() override {
+ ShardingTestFixture::setUp();
+ setRemote(HostAndPort("ClientHost", 12345));
+
+ configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
+
+ std::vector<ShardType> shards;
+ for (size_t i = 0; i < kTestShardIds.size(); i++) {
+ ShardType shardType;
+ shardType.setName(kTestShardIds[i].toString());
+ shardType.setHost(kTestShardHosts[i].toString());
+
+ shards.push_back(shardType);
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i]));
+ targeter->setFindHostReturnValue(kTestShardHosts[i]);
+
+ targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]),
+ std::move(targeter));
+ }
+
+ setupShards(shards);
+ }
+
+ boost::intrusive_ptr<ExpressionContextForTest> getExpCtx() {
+ return _expCtx.get();
+ }
+
+private:
+ boost::intrusive_ptr<ExpressionContextForTest> _expCtx;
+};
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectNonArray) {
+ auto spec = BSON("$mergeCursors" << BSON(
+ "cursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host"
+ << kTestHost.toString()))));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 17026);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectEmptyArray) {
+ auto spec = BSON("$mergeCursors" << BSONArray());
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50729);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNoNamespace) {
+ auto spec =
+ BSON("$mergeCursors" << BSON_ARRAY(BSON("id" << 0LL << "host" << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50731);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNonStringNamespace) {
+ auto spec = BSON("$mergeCursors" << BSON_ARRAY(
+ BSON("ns" << 4 << "id" << 0LL << "host" << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50731);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorsWithDifferentNamespaces) {
+ auto spec = BSON(
+ "$mergeCursors"
+ << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" << kTestHost.toString())
+ << BSON("ns"
+ << "test.other"_sd
+ << "id"
+ << 0LL
+ << "host"
+ << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50720);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNoHost) {
+ auto spec = BSON("$mergeCursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL)));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50721);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNonStringHost) {
+ auto spec = BSON("$mergeCursors"
+ << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" << 4LL)));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50721);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNonLongId) {
+ auto spec = BSON("$mergeCursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id"
+ << "zero"
+ << "host"
+ << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50722);
+ spec = BSON("$mergeCursors" << BSON_ARRAY(
+ BSON("ns" << kTestNss.ns() << "id" << 0 << "host" << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50722);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithExtraField) {
+ auto spec =
+ BSON("$mergeCursors" << BSON_ARRAY(BSON(
+ "ns" << kTestNss.ns() << "id" << 0LL << "host" << kTestHost.toString() << "extra"
+ << "unexpected")));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50730);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseTheSerializedVersionOfItself) {
+ auto spec =
+ BSON("$mergeCursors" << BSON_ARRAY(
+ BSON("ns" << kTestNss.ns() << "id" << 1LL << "host" << kTestHost.toString())
+ << BSON("ns" << kTestNss.ns() << "id" << 2LL << "host" << kTestHost.toString())));
+ auto mergeCursors =
+ DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx());
+ std::vector<Value> serializationArray;
+ mergeCursors->serializeToArray(serializationArray);
+ ASSERT_EQ(serializationArray.size(), 1UL);
+ // The serialized version might not be identical to 'spec', the fields might be in a different
+ // order, etc. Here we just make sure that the final parse doesn't throw.
+ auto newSpec = serializationArray[0].getDocument().toBson();
+ ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx()));
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) {
+ auto expCtx = getExpCtx();
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
+ cursors.emplace_back(
+ kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
+ auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+
+ ASSERT_TRUE(mergeCursorsStage->getNext().isEOF());
+}
+
+BSONObj cursorResponseObj(const NamespaceString& nss,
+ CursorId cursorId,
+ std::vector<BSONObj> batch) {
+ return CursorResponse{nss, cursorId, std::move(batch)}.toBSON(
+ CursorResponse::ResponseType::SubsequentResponse);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
+ auto expCtx = getExpCtx();
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
+
+ // Iterate the $mergeCursors stage asynchronously on a different thread, since it will block
+ // waiting for network responses, which we will manually schedule below.
+ auto future = launchAsync([&pipeline]() {
+ for (int i = 0; i < 5; ++i) {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}}));
+ }
+ ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
+ });
+
+
+ // Schedule responses to two getMores which keep the cursor open.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(
+ expCtx->ns, request.cmdObj["getMore"].Long(), {BSON("x" << 1), BSON("x" << 1)});
+ });
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(
+ expCtx->ns, request.cmdObj["getMore"].Long(), {BSON("x" << 1), BSON("x" << 1)});
+ });
+
+ // Schedule responses to two getMores which report the cursor is exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(expCtx->ns, kExhaustedCursorID, {});
+ });
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(expCtx->ns, kExhaustedCursorID, {BSON("x" << 1)});
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) {
+ auto expCtx = getExpCtx();
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
+
+ pipeline.reset(); // Delete the pipeline before using it.
+
+ network()->enterNetwork();
+ ASSERT_FALSE(network()->hasReadyRequests());
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorsIfPartiallyIterated) {
+ auto expCtx = getExpCtx();
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
+
+ // Iterate the pipeline asynchronously on a different thread, since it will block waiting for
+ // network responses, which we will manually schedule below.
+ auto future = launchAsync([&pipeline]() {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}}));
+ pipeline.reset(); // Stop iterating and delete the pipeline.
+ });
+
+ // Note we do not use 'kExhaustedCursorID' here, so the cursors are still open.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(expCtx->ns, 1, {BSON("x" << 1), BSON("x" << 1)});
+ });
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(expCtx->ns, 2, {BSON("x" << 1), BSON("x" << 1)});
+ });
+
+ // Here we're looking for the killCursors requests to be scheduled.
+ std::set<CursorId> killedCursors;
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["killCursors"]);
+ auto cursors = request.cmdObj["cursors"];
+ ASSERT_EQ(cursors.type(), BSONType::Array);
+ auto cursorsArray = cursors.Array();
+ ASSERT_FALSE(cursorsArray.empty());
+ auto cursorId = cursorsArray[0].Long();
+ ASSERT(cursorId == 1 || cursorId == 2);
+ killedCursors.insert(cursorId);
+ // The ARM doesn't actually inspect the response of the killCursors, so we don't have to put
+ // anything except {ok: 1}.
+ return BSON("ok" << 1);
+ });
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["killCursors"]);
+ auto cursors = request.cmdObj["cursors"];
+ ASSERT_EQ(cursors.type(), BSONType::Array);
+ auto cursorsArray = cursors.Array();
+ ASSERT_FALSE(cursorsArray.empty());
+ auto cursorId = cursorsArray[0].Long();
+ ASSERT(cursorId == 1 || cursorId == 2);
+ killedCursors.insert(cursorId);
+ // The ARM doesn't actually inspect the response of the killCursors, so we don't have to put
+ // anything except {ok: 1}.
+ return BSON("ok" << 1);
+ });
+
+ future.timed_get(kFutureTimeout);
+ ASSERT_EQ(killedCursors.size(), 2UL);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrder) {
+ auto expCtx = getExpCtx();
+
+ // Make a pipeline with a single $sort stage that is merging pre-sorted results.
+ const bool mergingPresorted = true;
+ const long long noLimit = -1;
+ auto sortStage = DocumentSourceSort::create(expCtx,
+ BSON("x" << 1),
+ noLimit,
+ DocumentSourceSort::kMaxMemoryUsageBytes,
+ mergingPresorted);
+ auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
+
+ // Make a $mergeCursors stage and add it to the front of the pipeline.
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
+
+ // After optimization we should only have a $mergeCursors stage.
+ pipeline->optimizePipeline();
+ ASSERT_EQ(pipeline->getSources().size(), 1UL);
+ ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+
+ // Iterate the pipeline asynchronously on a different thread, since it will block waiting for
+ // network responses, which we will manually schedule below.
+ auto future = launchAsync([&pipeline]() {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}}));
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 2}}));
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}}));
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}}));
+ ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
+ });
+
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 1 << "$sortKey" << BSON("" << 1)),
+ BSON("x" << 3 << "$sortKey" << BSON("" << 3))});
+ });
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 2 << "$sortKey" << BSON("" << 2)),
+ BSON("x" << 4 << "$sortKey" << BSON("" << 4))});
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) {
+ auto expCtx = getExpCtx();
+
+ // Make a pipeline with a single $sort stage that is merging pre-sorted results.
+ const bool mergingPresorted = true;
+ const long long limit = 3;
+ auto sortStage = DocumentSourceSort::create(
+ expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted);
+ auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
+
+ // Make a $mergeCursors stage and add it to the front of the pipeline.
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
+
+ // After optimization, we should still have a $limit stage.
+ pipeline->optimizePipeline();
+ ASSERT_EQ(pipeline->getSources().size(), 2UL);
+ ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+ ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
+
+ // Iterate the pipeline asynchronously on a different thread, since it will block waiting for
+ // network responses, which we will manually schedule below.
+ auto future = launchAsync([&]() {
+ for (int i = 1; i <= limit; ++i) {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}}));
+ }
+ ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
+ });
+
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 1 << "$sortKey" << BSON("" << 1)),
+ BSON("x" << 3 << "$sortKey" << BSON("" << 3))});
+ });
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 2 << "$sortKey" << BSON("" << 2)),
+ BSON("x" << 4 << "$sortKey" << BSON("" << 4))});
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) {
+ auto expCtx = getExpCtx();
+
+ // Make a pipeline with a single $sort stage that is merging pre-sorted results.
+ const bool mergingPresorted = true;
+ const long long limit = 3;
+ auto sortStage = DocumentSourceSort::create(
+ expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted);
+ auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
+
+ // Make a $mergeCursors stage and add it to the front of the pipeline.
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
+
+ // After optimization, we should still have a $limit stage.
+ pipeline->optimizePipeline();
+ ASSERT_EQ(pipeline->getSources().size(), 2UL);
+ ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+ ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
+
+ auto serialized = pipeline->serialize();
+ ASSERT_EQ(serialized.size(), 3UL);
+ ASSERT_FALSE(serialized[0]["$mergeCursors"].missing());
+ ASSERT_FALSE(serialized[1]["$sort"].missing());
+ ASSERT_FALSE(serialized[2]["$limit"].missing());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 2993bd1286d..38e72693d1d 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -106,6 +106,8 @@ REGISTER_DOCUMENT_SOURCE(sort,
DocumentSource::GetNextResult DocumentSourceSort::getNext() {
pExpCtx->checkForInterrupt();
+ invariant(!_mergingPresorted); // A presorted-merge should be optimized into the merge, and
+ // never executed.
if (!_populated) {
const auto populationResult = populate();
@@ -330,24 +332,14 @@ SortOptions DocumentSourceSort::makeSortOptions() const {
}
DocumentSource::GetNextResult DocumentSourceSort::populate() {
- if (_mergingPresorted) {
- typedef DocumentSourceMergeCursors DSCursors;
- if (DSCursors* castedSource = dynamic_cast<DSCursors*>(pSource)) {
- populateFromCursors(castedSource->getCursors());
- } else {
- msgasserted(17196, "can only mergePresorted from MergeCursors");
- }
- return DocumentSource::GetNextResult::makeEOF();
- } else {
- auto nextInput = pSource->getNext();
- for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
- loadDocument(nextInput.releaseDocument());
- }
- if (nextInput.isEOF()) {
- loadingDone();
- }
- return nextInput;
+ auto nextInput = pSource->getNext();
+ for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
+ loadDocument(nextInput.releaseDocument());
}
+ if (nextInput.isEOF()) {
+ loadingDone();
+ }
+ return nextInput;
}
void DocumentSourceSort::loadDocument(Document&& doc) {
@@ -374,44 +366,6 @@ void DocumentSourceSort::loadingDone() {
_populated = true;
}
-class DocumentSourceSort::IteratorFromCursor : public MySorter::Iterator {
-public:
- IteratorFromCursor(DocumentSourceSort* sorter, DBClientCursor* cursor)
- : _sorter(sorter), _cursor(cursor) {}
-
- bool more() {
- return _cursor->more();
- }
- Data next() {
- auto doc = DocumentSourceMergeCursors::nextSafeFrom(_cursor);
- if (doc.hasSortKeyMetaField()) {
- // We set the sort key metadata field during the first half of the sort, so just use
- // that as the sort key here.
- return make_pair(
- deserializeSortKey(_sorter->_sortPattern.size(), doc.getSortKeyMetaField()), doc);
- } else {
- // It's possible this result is coming from a shard that is still on an old version. If
- // that's the case, it won't tell us it's sort key - we'll have to re-compute it
- // ourselves.
- return _sorter->extractSortKey(std::move(doc));
- }
- }
-
-private:
- DocumentSourceSort* _sorter;
- DBClientCursor* _cursor;
-};
-
-void DocumentSourceSort::populateFromCursors(const vector<DBClientCursor*>& cursors) {
- vector<std::shared_ptr<MySorter::Iterator>> iterators;
- for (size_t i = 0; i < cursors.size(); i++) {
- iterators.push_back(std::make_shared<IteratorFromCursor>(this, cursors[i]));
- }
-
- _output.reset(MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this)));
- _populated = true;
-}
-
Value DocumentSourceSort::getCollationComparisonKey(const Value& val) const {
const auto collator = pExpCtx->getCollator();
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index b1aeb09cd43..a9b2ed14c2f 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -155,9 +155,6 @@ protected:
void doDispose() final;
private:
- // This is used to merge pre-sorted results from a DocumentSourceMergeCursors.
- class IteratorFromCursor;
-
using MySorter = Sorter<Value, Document>;
// For MySorter.
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 499d8c3d6e5..0eb286bca28 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -142,7 +142,7 @@ void Pipeline::validatePipeline() const {
uasserted(ErrorCodes::InvalidNamespace,
"{aggregate: 1} is not valid for an empty pipeline.");
}
- } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) {
+ } else if ("$mergeCursors"_sd != _sources.front()->getSourceName()) {
// The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
// {aggregate: 1} is only valid for collectionless sources, and vice-versa.
const auto firstStageConstraints = _sources.front()->constraints(_splitState);
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 28274b3e1ff..d3779fa6f86 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1231,6 +1231,7 @@ env.Library(
'$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/s/query/async_results_merger',
+ '$BUILD_DIR/mongo/s/query/cluster_client_cursor',
'$BUILD_DIR/mongo/util/progress_meter',
],
)
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 2550179e3de..e52f81efc77 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
@@ -487,27 +488,19 @@ DispatchShardPipelineResults dispatchShardPipeline(
targetedCommand};
}
-std::pair<ShardId, Shard::CommandResponse> establishMergingShardCursor(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const std::vector<ClusterClientCursorParams::RemoteCursor>& cursors,
- const BSONObj mergeCmdObj,
- const boost::optional<ShardId> primaryShard) {
- // Run merging command on random shard, unless we need to run on the primary shard.
- auto& prng = opCtx->getClient()->getPrng();
- const auto mergingShardId =
- primaryShard ? primaryShard.get() : cursors[prng.nextInt32(cursors.size())].shardId;
+Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj mergeCmdObj,
+ const ShardId& mergingShardId) {
const auto mergingShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
- auto shardCmdResponse = uassertStatusOK(
+ return uassertStatusOK(
mergingShard->runCommandWithFixedRetryAttempts(opCtx,
ReadPreferenceSetting::get(opCtx),
nss.db().toString(),
mergeCmdObj,
Shard::RetryPolicy::kIdempotent));
-
- return {std::move(mergingShardId), std::move(shardCmdResponse)};
}
BSONObj establishMergingMongosCursor(OperationContext* opCtx,
@@ -634,6 +627,19 @@ BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard,
return defaultCollation;
}
+ShardId pickMergingShard(OperationContext* opCtx,
+ const DispatchShardPipelineResults& dispatchResults,
+ ShardId primaryShard) {
+ auto& prng = opCtx->getClient()->getPrng();
+ // If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging
+ // command on random shard, unless the pipeline dictates that it needs to be run on the primary
+ // shard for the database.
+ return dispatchResults.needsPrimaryShardMerge
+ ? primaryShard
+ : dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())]
+ .shardId;
+}
+
} // namespace
Status ClusterAggregate::runAggregate(OperationContext* opCtx,
@@ -811,29 +817,25 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return getStatusFromCommandResult(result->asTempObj());
}
- // If we cannot merge on mongoS, establish the merge cursor on a shard.
- mergingPipeline->addInitialSource(
- DocumentSourceMergeCursors::create(parseCursors(dispatchResults.remoteCursors), mergeCtx));
+ ShardId mergingShardId =
+ pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.primaryId());
+
+ mergingPipeline->addInitialSource(DocumentSourceMergeCursors::create(
+ std::move(dispatchResults.remoteCursors),
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ mergeCtx));
auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);
auto mergeResponse =
- establishMergingShardCursor(opCtx,
- namespaces.executionNss,
- dispatchResults.remoteCursors,
- mergeCmdObj,
- boost::optional<ShardId>{dispatchResults.needsPrimaryShardMerge,
- executionNsRoutingInfo.primaryId()});
-
- auto mergingShardId = mergeResponse.first;
- auto response = mergeResponse.second;
+ establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId);
// The merging shard is remote, so if a response was received, a HostAndPort must have been set.
- invariant(response.hostAndPort);
+ invariant(mergeResponse.hostAndPort);
auto mergeCursorResponse = uassertStatusOK(
storePossibleCursor(opCtx,
mergingShardId,
- *response.hostAndPort,
- response.response,
+ *mergeResponse.hostAndPort,
+ mergeResponse.response,
namespaces.requestedNss,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager()));
@@ -841,19 +843,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result);
}
-std::vector<DocumentSourceMergeCursors::CursorDescriptor> ClusterAggregate::parseCursors(
- const std::vector<ClusterClientCursorParams::RemoteCursor>& responses) {
- std::vector<DocumentSourceMergeCursors::CursorDescriptor> cursors;
- for (const auto& response : responses) {
- invariant(0 != response.cursorResponse.getCursorId());
- invariant(response.cursorResponse.getBatch().empty());
- cursors.emplace_back(ConnectionString(response.hostAndPort),
- response.cursorResponse.getNSS().toString(),
- response.cursorResponse.getCursorId());
- }
- return cursors;
-}
-
void ClusterAggregate::uassertAllShardsSupportExplain(
const std::vector<AsyncRequestsSender::Response>& shardResults) {
for (const auto& result : shardResults) {
diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h
index 740cc62c610..d7c0071e3de 100644
--- a/src/mongo/s/commands/cluster_aggregate.h
+++ b/src/mongo/s/commands/cluster_aggregate.h
@@ -81,9 +81,6 @@ public:
BSONObjBuilder* result);
private:
- static std::vector<DocumentSourceMergeCursors::CursorDescriptor> parseCursors(
- const std::vector<ClusterClientCursorParams::RemoteCursor>& cursors);
-
static void uassertAllShardsSupportExplain(
const std::vector<AsyncRequestsSender::Response>& shardResults);
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 43d2fc5c7e5..0c90f644c5e 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -25,6 +25,7 @@ env.Library(
"cluster_client_cursor_impl.cpp",
],
LIBDEPS=[
+ "$BUILD_DIR/mongo/db/pipeline/pipeline",
"router_exec_stage",
],
)
@@ -46,7 +47,7 @@ env.Library(
"async_results_merger",
],
LIBDEPS_PRIVATE=[
- "$BUILD_DIR/mongo/db/pipeline/document_source_lookup",
+ "$BUILD_DIR/mongo/db/pipeline/pipeline",
],
)
@@ -68,7 +69,6 @@ env.Library(
target="async_results_merger",
source=[
"async_results_merger.cpp",
- "cluster_client_cursor_params.cpp",
"establish_cursors.cpp",
],
LIBDEPS=[
@@ -76,10 +76,7 @@ env.Library(
"$BUILD_DIR/mongo/executor/task_executor_interface",
"$BUILD_DIR/mongo/s/async_requests_sender",
"$BUILD_DIR/mongo/s/client/sharding_client",
- "$BUILD_DIR/mongo/db/pipeline/pipeline",
- ],
- LIBDEPS_PRIVATE=[
- "$BUILD_DIR/mongo/db/pipeline/document_source_lookup",
+ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
],
)
@@ -100,6 +97,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'async_results_merger',
+ 'cluster_client_cursor',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/query/query_request',
'$BUILD_DIR/mongo/db/service_context_noop_init',
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 8f80a4b64ab..e315452160f 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -34,7 +34,7 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/client/remote_command_targeter.h"
-#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/killcursors_request.h"
@@ -44,6 +44,10 @@
#include "mongo/util/log.h"
namespace mongo {
+
+constexpr StringData AsyncResultsMerger::kSortKeyField;
+const BSONObj AsyncResultsMerger::kWholeSortKeySortPattern = BSON(kSortKeyField << 1);
+
namespace {
// Maximum number of retries for network and replication notMaster errors (per host).
@@ -54,7 +58,7 @@ const int kMaxNumFailedHostRetryAttempts = 3;
* {'': 'firstSortKey', '': 'secondSortKey', ...}.
*/
BSONObj extractSortKey(BSONObj obj, bool compareWholeSortKey) {
- auto key = obj[ClusterClientCursorParams::kSortKeyField];
+ auto key = obj[AsyncResultsMerger::kSortKeyField];
invariant(key);
if (compareWholeSortKey) {
return key.wrap();
@@ -451,8 +455,8 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
remote->cursorId = response.getCursorId();
if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) {
// We only expect to see this for change streams.
- invariant(SimpleBSONObjComparator::kInstance.evaluate(
- _params->sort == DocumentSourceChangeStream::kSortSpec));
+ invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort ==
+ change_stream_constants::kSortSpec));
auto newLatestTimestamp = *response.getLastOplogTimestamp();
if (remote->promisedMinSortKey) {
@@ -482,7 +486,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
remote->promisedMinSortKey =
(compareSortKeys(
- newPromisedMin, maxSortKeyFromResponse, DocumentSourceChangeStream::kSortSpec) < 0
+ newPromisedMin, maxSortKeyFromResponse, change_stream_constants::kSortSpec) < 0
? maxSortKeyFromResponse.getOwned()
: newPromisedMin.getOwned());
}
@@ -586,18 +590,18 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
if (!_params->sort.isEmpty()) {
- auto key = obj[ClusterClientCursorParams::kSortKeyField];
+ auto key = obj[AsyncResultsMerger::kSortKeyField];
if (!key) {
- remote.status = Status(ErrorCodes::InternalError,
- str::stream() << "Missing field '"
- << ClusterClientCursorParams::kSortKeyField
- << "' in document: "
- << obj);
+ remote.status =
+ Status(ErrorCodes::InternalError,
+ str::stream() << "Missing field '" << AsyncResultsMerger::kSortKeyField
+ << "' in document: "
+ << obj);
return false;
} else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) {
remote.status =
Status(ErrorCodes::InternalError,
- str::stream() << "Field '" << ClusterClientCursorParams::kSortKeyField
+ str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField
<< "' was not of type Object in document: "
<< obj);
return false;
@@ -732,4 +736,36 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const
_sort) > 0;
}
+void AsyncResultsMerger::blockingKill(OperationContext* opCtx) {
+ auto killEvent = kill(opCtx);
+ if (!killEvent) {
+ // We are shutting down.
+ return;
+ }
+ _executor->waitForEvent(killEvent);
+}
+
+StatusWith<ClusterQueryResult> AsyncResultsMerger::blockingNext() {
+ while (!ready()) {
+ auto nextEventStatus = nextEvent();
+ if (!nextEventStatus.isOK()) {
+ return nextEventStatus.getStatus();
+ }
+ auto event = nextEventStatus.getValue();
+
+ // Block until there are further results to return.
+ auto status = _executor->waitForEvent(_opCtx, event);
+
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+
+ // We have not provided a deadline, so if the wait returns without interruption, we do not
+ // expect to have timed out.
+ invariant(status.getValue() == stdx::cv_status::no_timeout);
+ }
+
+ return nextReady();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 0a441016060..b22584e9c27 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -75,6 +75,13 @@ class AsyncResultsMerger {
MONGO_DISALLOW_COPYING(AsyncResultsMerger);
public:
+ // When mongos has to do a merge in order to return results to the client in the correct sort
+ // order, it requests a sortKey meta-projection using this field name.
+ static constexpr StringData kSortKeyField = "$sortKey"_sd;
+
+ // The expected sort key pattern when 'compareWholeSortKey' is true.
+ static const BSONObj kWholeSortKeySortPattern;
+
/**
* Takes ownership of the cursors from ClusterClientCursorParams by storing their cursorIds and
* the hosts on which they exist in _remotes.
@@ -160,6 +167,12 @@ public:
StatusWith<ClusterQueryResult> nextReady();
/**
+ * Blocks until the next result is ready, all remote cursors are exhausted, or there is an
+ * error.
+ */
+ StatusWith<ClusterQueryResult> blockingNext();
+
+ /**
* Schedules remote work as required in order to make further results available. If there is an
* error in scheduling this work, returns a non-ok status. On success, returns an event handle.
* The caller can pass this event handle to 'executor' in order to be blocked until further
@@ -203,6 +216,11 @@ public:
*/
executor::TaskExecutor::EventHandle kill(OperationContext* opCtx);
+ /**
+ * A blocking version of kill() that will not return until this is safe to destroy.
+ */
+ void blockingKill(OperationContext*);
+
private:
/**
* We instantiate one of these per remote host. It contains the buffer of results we've
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index ccbce1d5517..34e70fc8333 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -1880,6 +1880,72 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin
executor()->waitForEvent(killEvent);
}
-} // namespace
+TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
+
+ // Before any requests are scheduled, ARM is not ready to return results.
+ ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto next = unittest::assertGet(arm->blockingNext());
+ ASSERT_FALSE(next.isEOF());
+ ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
+ next = unittest::assertGet(arm->blockingNext());
+ ASSERT_TRUE(next.isEOF());
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(_nss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto nextStatus = arm->blockingNext();
+ ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
+ });
+
+ // Now mark the OperationContext as killed from this thread.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->markKilled(ErrorCodes::Interrupted);
+ }
+ future.timed_get(kFutureTimeout);
+ // Be careful not to use a blocking kill here, since the main thread is in charge of running the
+ // callbacks, and we'd block on ourselves.
+ auto killEvent = arm->kill(operationContext());
+
+ assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1);
+ runReadyCallbacks();
+ executor()->waitForEvent(killEvent);
+}
+
+TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) {
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
+
+ // Before any requests are scheduled, ARM is not ready to return results.
+ ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ arm->blockingKill(operationContext());
+}
+
+} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index e555f1d32e8..10152522f59 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -79,7 +79,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
_opCtx(opCtx) {
dassert(!_params.compareWholeSortKey ||
SimpleBSONObjComparator::kInstance.evaluate(
- _params.sort == ClusterClientCursorParams::kWholeSortKeySortPattern));
+ _params.sort == AsyncResultsMerger::kWholeSortKeySortPattern));
}
ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
@@ -89,7 +89,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
: _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) {
dassert(!_params.compareWholeSortKey ||
SimpleBSONObjComparator::kInstance.evaluate(
- _params.sort == ClusterClientCursorParams::kWholeSortKeySortPattern));
+ _params.sort == AsyncResultsMerger::kWholeSortKeySortPattern));
}
StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next(
@@ -281,9 +281,7 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
if (hasSort) {
// Strip out the sort key after sorting.
root = stdx::make_unique<RouterStageRemoveMetadataFields>(
- opCtx,
- std::move(root),
- std::vector<StringData>{ClusterClientCursorParams::kSortKeyField});
+ opCtx, std::move(root), std::vector<StringData>{AsyncResultsMerger::kSortKeyField});
}
return root;
diff --git a/src/mongo/s/query/cluster_client_cursor_params.cpp b/src/mongo/s/query/cluster_client_cursor_params.cpp
deleted file mode 100644
index 4a033135c2a..00000000000
--- a/src/mongo/s/query/cluster_client_cursor_params.cpp
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/query/cluster_client_cursor_params.h"
-
-namespace mongo {
-
-const char ClusterClientCursorParams::kSortKeyField[] = "$sortKey";
-const BSONObj ClusterClientCursorParams::kWholeSortKeySortPattern = BSON(kSortKeyField << 1);
-
-} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 3bc5da6c9c0..d213b0ea73f 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -61,13 +61,6 @@ class RouterExecStage;
* this cursor have been processed.
*/
struct ClusterClientCursorParams {
- // When mongos has to do a merge in order to return results to the client in the correct sort
- // order, it requests a sortKey meta-projection using this field name.
- static const char kSortKeyField[];
-
- // The expected sort key pattern when 'compareWholeSortKey' is true.
- static const BSONObj kWholeSortKeySortPattern;
-
struct RemoteCursor {
RemoteCursor(ShardId shardId, HostAndPort hostAndPort, CursorResponse cursorResponse)
: shardId(std::move(shardId)),
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 91c58187615..acab84e033d 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -50,6 +50,7 @@
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/query/async_results_merger.h"
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/establish_cursors.h"
@@ -135,7 +136,7 @@ StatusWith<std::unique_ptr<QueryRequest>> transformQueryForShards(
if (!qr.getSort().isEmpty() && !qr.getSort()["$natural"]) {
BSONObjBuilder projectionBuilder;
projectionBuilder.appendElements(qr.getProj());
- projectionBuilder.append(ClusterClientCursorParams::kSortKeyField, kSortKeyMetaProjection);
+ projectionBuilder.append(AsyncResultsMerger::kSortKeyField, kSortKeyMetaProjection);
newProjection = projectionBuilder.obj();
}
@@ -143,8 +144,7 @@ StatusWith<std::unique_ptr<QueryRequest>> transformQueryForShards(
invariant(qr.getSort().isEmpty());
BSONObjBuilder projectionBuilder;
projectionBuilder.appendElements(qr.getProj());
- projectionBuilder.append(ClusterClientCursorParams::kSortKeyField,
- kGeoNearDistanceMetaProjection);
+ projectionBuilder.append(AsyncResultsMerger::kSortKeyField, kGeoNearDistanceMetaProjection);
newProjection = projectionBuilder.obj();
}
@@ -218,7 +218,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
// by the geoNearDistance. Request the projection {$sortKey: <geoNearDistance>} from the
// shards. Indicate to the AsyncResultsMerger that it should extract the sort key
// {"$sortKey": <geoNearDistance>} and sort by the order {"$sortKey": 1}.
- params.sort = ClusterClientCursorParams::kWholeSortKeySortPattern;
+ params.sort = AsyncResultsMerger::kWholeSortKeySortPattern;
params.compareWholeSortKey = true;
appendGeoNearDistanceProjection = true;
}
@@ -337,10 +337,10 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
invariant(results);
// Projection on the reserved sort key field is illegal in mongos.
- if (query.getQueryRequest().getProj().hasField(ClusterClientCursorParams::kSortKeyField)) {
+ if (query.getQueryRequest().getProj().hasField(AsyncResultsMerger::kSortKeyField)) {
uasserted(ErrorCodes::BadValue,
str::stream() << "Projection contains illegal field '"
- << ClusterClientCursorParams::kSortKeyField
+ << AsyncResultsMerger::kSortKeyField
<< "': "
<< query.getQueryRequest().getProj());
}
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 8ff0dfc991a..4f17927483b 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -47,32 +47,7 @@ StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) {
// cursors wait for ready() only until a specified time limit is exceeded.
return (_params->tailableMode == TailableMode::kTailableAndAwaitData
? awaitNextWithTimeout(execCtx)
- : blockForNextNoTimeout(execCtx));
-}
-
-StatusWith<ClusterQueryResult> RouterStageMerge::blockForNextNoTimeout(ExecContext execCtx) {
- invariant(_params->tailableMode != TailableMode::kTailableAndAwaitData);
- invariant(getOpCtx());
- while (!_arm.ready()) {
- auto nextEventStatus = _arm.nextEvent();
- if (!nextEventStatus.isOK()) {
- return nextEventStatus.getStatus();
- }
- auto event = nextEventStatus.getValue();
-
- // Block until there are further results to return.
- auto status = _executor->waitForEvent(getOpCtx(), event);
-
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- // We have not provided a deadline, so if the wait returns without interruption, we do not
- // expect to have timed out.
- invariant(status.getValue() == stdx::cv_status::no_timeout);
- }
-
- return _arm.nextReady();
+ : _arm.blockingNext());
}
StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) {
@@ -120,12 +95,7 @@ StatusWith<EventHandle> RouterStageMerge::getNextEvent() {
}
void RouterStageMerge::kill(OperationContext* opCtx) {
- auto killEvent = _arm.kill(opCtx);
- if (!killEvent) {
- // Mongos is shutting down.
- return;
- }
- _executor->waitForEvent(killEvent);
+ _arm.blockingKill(opCtx);
}
bool RouterStageMerge::remotesExhausted() {
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index bb7d6ed81a7..efd397b8c7e 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -75,12 +75,6 @@ protected:
private:
/**
- * Returns the next document received by the ARM, blocking indefinitely until we either have a
- * new result or exhaust the remote cursors.
- */
- StatusWith<ClusterQueryResult> blockForNextNoTimeout(ExecContext execCtx);
-
- /**
* Awaits the next result from the ARM up to a specified time limit. If this is the user's
* initial find or we have already obtained at least one result for this batch, this method
* returns EOF immediately rather than blocking.