diff options
25 files changed, 1046 insertions, 560 deletions
diff --git a/src/mongo/db/auth/SConscript b/src/mongo/db/auth/SConscript index d36b1854c14..6ea06b5dddd 100644 --- a/src/mongo/db/auth/SConscript +++ b/src/mongo/db/auth/SConscript @@ -263,7 +263,7 @@ env.CppUnitTest( 'authmocks', 'saslauth', 'authorization_session_for_test', - '$BUILD_DIR/mongo/db/pipeline/document_source', + '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', '$BUILD_DIR/mongo/transport/transport_layer_mock', ] diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 680094f0971..fcbb40e3199 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -11,9 +11,6 @@ env.Library( ], LIBDEPS=[ 'aggregation_request', - 'document_source', - 'document_source_facet', - 'document_source_lookup', 'expression_context', 'pipeline', ] @@ -123,50 +120,6 @@ env.Library( ] ) -env.CppUnitTest( - target='document_source_test', - source=[ - 'document_source_add_fields_test.cpp', - 'document_source_bucket_auto_test.cpp', - 'document_source_bucket_test.cpp', - 'document_source_change_stream_test.cpp', - 'document_source_check_resume_token_test.cpp', - 'document_source_count_test.cpp', - 'document_source_current_op_test.cpp', - 'document_source_geo_near_test.cpp', - 'document_source_group_test.cpp', - 'document_source_limit_test.cpp', - 'document_source_lookup_change_post_image_test.cpp', - 'document_source_lookup_test.cpp', - 'document_source_graph_lookup_test.cpp', - 'document_source_match_test.cpp', - 'document_source_mock_test.cpp', - 'document_source_project_test.cpp', - 'document_source_redact_test.cpp', - 'document_source_replace_root_test.cpp', - 'document_source_sample_test.cpp', - 'document_source_skip_test.cpp', - 'document_source_sort_by_count_test.cpp', - 'document_source_sort_test.cpp', - 'document_source_test.cpp', - 'document_source_unwind_test.cpp', - 'sequential_document_cache_test.cpp', - ], - LIBDEPS=[ - 'document_source', - 'document_source_facet', - 'document_source_lookup', - 'document_source_mock', - 'document_value_test_util', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/repl/oplog_entry', - '$BUILD_DIR/mongo/db/repl/replmocks', - '$BUILD_DIR/mongo/db/service_context', - '$BUILD_DIR/mongo/s/is_mongos', - '$BUILD_DIR/mongo/util/clock_source_mock', - ], -) - env.Library( target='dependencies', source=[ @@ -229,19 +182,117 @@ env.Library( ] ) -docSourceEnv = env.Clone() -docSourceEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) -docSourceEnv.Library( - target='document_source', +env.Library( + target='document_source_mock', + source=[ + 'document_source_mock.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/query/query_test_service_context', + 'pipeline', + ] +) + +env.CppUnitTest( + target='document_source_test', + source=[ + 'document_source_add_fields_test.cpp', + 'document_source_bucket_auto_test.cpp', + 'document_source_bucket_test.cpp', + 'document_source_change_stream_test.cpp', + 'document_source_check_resume_token_test.cpp', + 'document_source_count_test.cpp', + 'document_source_current_op_test.cpp', + 'document_source_geo_near_test.cpp', + 'document_source_graph_lookup_test.cpp', + 'document_source_group_test.cpp', + 'document_source_limit_test.cpp', + 'document_source_lookup_change_post_image_test.cpp', + 'document_source_lookup_test.cpp', + 'document_source_match_test.cpp', + 'document_source_mock_test.cpp', + 'document_source_project_test.cpp', + 'document_source_redact_test.cpp', + 'document_source_replace_root_test.cpp', + 'document_source_sample_test.cpp', + 'document_source_skip_test.cpp', + 'document_source_sort_by_count_test.cpp', + 'document_source_sort_test.cpp', + 'document_source_test.cpp', + 'document_source_unwind_test.cpp', + 'sequential_document_cache_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/repl/replmocks', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/util/clock_source_mock', + 'document_source_mock', + 'document_value_test_util', + 'pipeline', + ], +) + +# This test depends on the sharding test fixture, which has global initializers that conflict with +# the ones set in 'document_source_test', so is split into its own test. +env.CppUnitTest( + target='document_source_merge_cursors_test', + source=[ + 'document_source_merge_cursors_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/query/query_request', + '$BUILD_DIR/mongo/db/query/query_test_service_context', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/s/sharding_test_fixture', + 'pipeline', + 'document_value_test_util', + ], +) + +env.CppUnitTest( + target='document_source_facet_test', + source='document_source_facet_test.cpp', + LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/s/is_mongos', + 'pipeline', + 'document_source_mock', + 'document_value_test_util', + ], +) + +env.Library( + target='lite_parsed_document_source', + source=[ + 'lite_parsed_document_source.cpp', + ], + LIBDEPS=[ + 'aggregation_request', + ] +) + +pipelineeEnv = env.Clone() +pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) +pipelineeEnv.Library( + target='pipeline', source=[ 'document_source.cpp', 'document_source_add_fields.cpp', 'document_source_bucket.cpp', 'document_source_bucket_auto.cpp', + 'document_source_change_stream.cpp', + 'document_source_check_resume_token.cpp', 'document_source_coll_stats.cpp', 'document_source_count.cpp', 'document_source_current_op.cpp', + 'document_source_facet.cpp', 'document_source_geo_near.cpp', + 'document_source_graph_lookup.cpp', 'document_source_group.cpp', 'document_source_index_stats.cpp', 'document_source_internal_inhibit_optimization.cpp', @@ -250,6 +301,8 @@ docSourceEnv.Library( 'document_source_list_local_cursors.cpp', 'document_source_list_local_sessions.cpp', 'document_source_list_sessions.cpp', + 'document_source_lookup.cpp', + 'document_source_lookup_change_post_image.cpp', 'document_source_match.cpp', 'document_source_merge_cursors.cpp', 'document_source_out.cpp', @@ -263,11 +316,15 @@ docSourceEnv.Library( 'document_source_skip.cpp', 'document_source_sort.cpp', 'document_source_sort_by_count.cpp', + 'document_source_tee_consumer.cpp', 'document_source_unwind.cpp', + 'pipeline.cpp', 'sequential_document_cache.cpp', - ], + 'tee_buffer.cpp', + ], LIBDEPS=[ '$BUILD_DIR/mongo/client/clientdriver', + '$BUILD_DIR/mongo/db/auth/authorization_manager_global', '$BUILD_DIR/mongo/db/bson/dotted_path_support', '$BUILD_DIR/mongo/db/generic_cursor', '$BUILD_DIR/mongo/db/index/key_generator', @@ -275,7 +332,10 @@ docSourceEnv.Library( '$BUILD_DIR/mongo/db/logical_session_id_helpers', '$BUILD_DIR/mongo/db/matcher/expressions', '$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source', + '$BUILD_DIR/mongo/db/query/collation/collator_factory_interface', + '$BUILD_DIR/mongo/db/query/collation/collator_interface', '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/repl/read_concern_args', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/sessions_collection', @@ -283,100 +343,19 @@ docSourceEnv.Library( '$BUILD_DIR/mongo/db/storage/encryption_hooks', '$BUILD_DIR/mongo/db/storage/storage_options', '$BUILD_DIR/mongo/s/is_mongos', + '$BUILD_DIR/mongo/s/query/async_results_merger', '$BUILD_DIR/third_party/shim_snappy', 'accumulator', 'dependencies', 'document_sources_idl', 'document_value', 'expression', + 'expression_context', 'granularity_rounder', 'parsed_aggregation_projection', - ], -) - -env.Library( - target='document_source_mock', - source=[ - 'document_source_mock.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/query/query_test_service_context', - 'document_source', - ] -) - -env.Library( - target='lite_parsed_document_source', - source=[ - 'lite_parsed_document_source.cpp', - ], - LIBDEPS=[ - 'aggregation_request', ] ) -env.Library( - target='pipeline', - source=[ - 'pipeline.cpp', - ], - LIBDEPS=[ - 'dependencies', - 'document_source', - 'document_value', - 'expression_context', - '$BUILD_DIR/mongo/db/auth/authorization_manager_global', - '$BUILD_DIR/mongo/db/bson/dotted_path_support', - '$BUILD_DIR/mongo/db/query/collation/collator_interface', - '$BUILD_DIR/mongo/db/query/collation/collator_factory_interface', - '$BUILD_DIR/mongo/db/repl/read_concern_args', - '$BUILD_DIR/mongo/db/storage/storage_options', - ] -) - -env.Library( - target='document_source_lookup', - source=[ - 'document_source_change_stream.cpp', - 'document_source_check_resume_token.cpp', - 'document_source_graph_lookup.cpp', - 'document_source_lookup.cpp', - 'document_source_lookup_change_post_image.cpp', - ], - LIBDEPS=[ - 'document_source', - 'pipeline', - '$BUILD_DIR/mongo/db/catalog/uuid_catalog', - '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', - ], -) - -env.Library( - target='document_source_facet', - source=[ - 'document_source_facet.cpp', - 'tee_buffer.cpp', - 'document_source_tee_consumer.cpp', - ], - LIBDEPS=[ - 'document_source', - 'pipeline', - ] -) - -env.CppUnitTest( - target='document_source_facet_test', - source='document_source_facet_test.cpp', - LIBDEPS=[ - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/service_context_noop_init', - '$BUILD_DIR/mongo/s/is_mongos', - 'document_source_facet', - 'document_source_mock', - 'document_value_test_util', - ], -) - env.CppUnitTest( target='tee_buffer_test', source='tee_buffer_test.cpp', @@ -384,7 +363,6 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/s/is_mongos', - 'document_source_facet', 'document_source_mock', 'document_value_test_util', ], @@ -430,7 +408,6 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/s/is_mongos', 'document_value_test_util', - 'document_source_lookup', 'document_source_mock', 'pipeline', ], @@ -537,6 +514,5 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', 'document_sources_idl', - 'document_source_lookup', ], ) diff --git a/src/mongo/db/pipeline/change_stream_constants.h b/src/mongo/db/pipeline/change_stream_constants.h new file mode 100644 index 00000000000..fe043d17572 --- /dev/null +++ b/src/mongo/db/pipeline/change_stream_constants.h @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { +namespace change_stream_constants { + +const BSONObj kSortSpec = + BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1); + +} // namespace change_stream_constants +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 95032ec3cae..6f4ac3ffc57 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -34,6 +34,7 @@ #include "mongo/db/bson/bson_helper.h" #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/logical_clock.h" +#include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" @@ -81,9 +82,6 @@ constexpr StringData DocumentSourceChangeStream::kInsertOpType; constexpr StringData DocumentSourceChangeStream::kInvalidateOpType; constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType; -const BSONObj DocumentSourceChangeStream::kSortSpec = - BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1); - namespace { @@ -186,7 +184,7 @@ public: const long long noLimit = -1; auto sortMergingPresorted = DocumentSourceSort::create(pExpCtx, - DocumentSourceChangeStream::kSortSpec, + change_stream_constants::kSortSpec, noLimit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted); diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 17f891c7dc7..87bb4b63a48 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -107,9 +107,6 @@ public: bool _documentKeyFieldsSharded = false; }; - // The sort pattern used to merge results from multiple change streams on a mongos. - static const BSONObj kSortSpec; - // The name of the field where the document key (_id and shard key, if present) will be found // after the transformation. static constexpr StringData kDocumentKeyField = "documentKey"_sd; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 1f0244d4127..a0d5b643d96 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -28,6 +28,7 @@ #pragma once +#include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_sort.h" @@ -122,7 +123,7 @@ public: const long long noLimit = -1; auto sortMergingPresorted = DocumentSourceSort::create(pExpCtx, - DocumentSourceChangeStream::kSortSpec, + change_stream_constants::kSortSpec, noLimit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted); diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index 539e15c0ccc..9eeb06e7be3 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -1,5 +1,5 @@ /** - * Copyright 2013 (c) 10gen Inc. + * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, @@ -17,179 +17,196 @@ * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" - -#include "mongo/db/pipeline/lite_parsed_document_source.h" -#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/grid.h" namespace mongo { -using boost::intrusive_ptr; -using std::make_pair; -using std::string; -using std::vector; - -constexpr StringData DocumentSourceMergeCursors::kStageName; - -DocumentSourceMergeCursors::DocumentSourceMergeCursors( - std::vector<CursorDescriptor> cursorDescriptors, - const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx), _cursorDescriptors(std::move(cursorDescriptors)), _unstarted(true) {} - REGISTER_DOCUMENT_SOURCE(mergeCursors, LiteParsedDocumentSourceDefault::parse, DocumentSourceMergeCursors::createFromBson); -intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create( - std::vector<CursorDescriptor> cursorDescriptors, - const intrusive_ptr<ExpressionContext>& pExpCtx) { - intrusive_ptr<DocumentSourceMergeCursors> source( - new DocumentSourceMergeCursors(std::move(cursorDescriptors), pExpCtx)); - return source; -} - -intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( - BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { - massert(17026, - string("Expected an Array, but got a ") + typeName(elem.type()), - elem.type() == Array); - - std::vector<CursorDescriptor> cursorDescriptors; - BSONObj array = elem.embeddedObject(); - BSONForEach(cursor, array) { - massert(17027, - string("Expected an Object, but got a ") + typeName(cursor.type()), - cursor.type() == Object); - - // The cursor descriptors for the merge cursors stage used to lack an "ns" field; "ns" was - // understood to be the expression context namespace in that case. For mixed-version - // compatibility, we accept both the old and new formats here. - std::string cursorNs = cursor["ns"] ? cursor["ns"].String() : pExpCtx->ns.ns(); - - cursorDescriptors.emplace_back(ConnectionString(HostAndPort(cursor["host"].String())), - std::move(cursorNs), - cursor["id"].Long()); - } +constexpr StringData DocumentSourceMergeCursors::kStageName; - return new DocumentSourceMergeCursors(std::move(cursorDescriptors), pExpCtx); -} +DocumentSourceMergeCursors::DocumentSourceMergeCursors( + executor::TaskExecutor* executor, + std::unique_ptr<ClusterClientCursorParams> cursorParams, + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx), _executor(executor), _armParams(std::move(cursorParams)) {} -Value DocumentSourceMergeCursors::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - vector<Value> cursors; - for (size_t i = 0; i < _cursorDescriptors.size(); i++) { - cursors.push_back( - Value(DOC("host" << Value(_cursorDescriptors[i].connectionString.toString()) << "ns" - << _cursorDescriptors[i].ns - << "id" - << _cursorDescriptors[i].cursorId))); +DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { + // We don't expect or support tailable cursors to be executing through this stage. + invariant(pExpCtx->tailableMode == TailableMode::kNormal); + if (!_arm) { + _arm.emplace(pExpCtx->opCtx, _executor, _armParams.get()); } - return Value(DOC(kStageName << Value(cursors))); + auto next = uassertStatusOK(_arm->blockingNext()); + if (next.isEOF()) { + return GetNextResult::makeEOF(); + } + return Document::fromBsonWithMetaData(*next.getResult()); } -DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection( - const CursorDescriptor& cursorDescriptor) - : connection(cursorDescriptor.connectionString), - cursor(connection.get(), cursorDescriptor.ns, cursorDescriptor.cursorId, 0, 0) {} - -vector<DBClientCursor*> DocumentSourceMergeCursors::getCursors() { - verify(_unstarted); - start(); - vector<DBClientCursor*> out; - for (Cursors::const_iterator it = _cursors.begin(); it != _cursors.end(); ++it) { - out.push_back(&((*it)->cursor)); +void DocumentSourceMergeCursors::serializeToArray( + std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { + invariant(!_arm); + invariant(_armParams); + std::vector<Value> cursors; + for (auto&& remote : _armParams->remotes) { + cursors.emplace_back(Document{{"host", remote.hostAndPort.toString()}, + {"ns", remote.cursorResponse.getNSS().toString()}, + {"id", remote.cursorResponse.getCursorId()}}); + } + array.push_back(Value(Document{{kStageName, Value(std::move(cursors))}})); + if (!_armParams->sort.isEmpty()) { + array.push_back(Value(Document{{DocumentSourceSort::kStageName, Value(_armParams->sort)}})); } - - return out; } -void DocumentSourceMergeCursors::start() { - _unstarted = false; +Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + invariant(!_arm); + invariant(_armParams); - // open each cursor and send message asking for a batch - for (auto&& cursorDescriptor : _cursorDescriptors) { - _cursors.push_back(std::make_shared<CursorAndConnection>(cursorDescriptor)); - verify(_cursors.back()->connection->lazySupported()); - _cursors.back()->cursor.initLazy(); // shouldn't block + auto next = std::next(itr); + if (next == container->end()) { + return next; } - // wait for all cursors to return a batch - // TODO need a way to keep cursors alive if some take longer than 10 minutes. - for (auto&& cursor : _cursors) { - bool retry = false; - bool ok = cursor->cursor.initLazyFinish(retry); // blocks here for first batch - - uassert( - 17028, "error reading response from " + _cursors.back()->connection->toString(), ok); - verify(!retry); + auto nextSort = dynamic_cast<DocumentSourceSort*>(next->get()); + if (!nextSort || !nextSort->mergingPresorted()) { + return next; } - _currentCursor = _cursors.begin(); -} - -Document DocumentSourceMergeCursors::nextSafeFrom(DBClientCursor* cursor) { - const BSONObj next = cursor->next(); - if (next.hasField("$err")) { - uassertStatusOKWithContext(getStatusFromCommandResult(next), - str::stream() << "Received error in response from " - << cursor->originalHost()); + _armParams->sort = + nextSort->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) + .toBson(); + if (auto sortLimit = nextSort->getLimitSrc()) { + // There was a limit stage absorbed into the sort stage, so we need to preserve that. + container->insert(std::next(next), sortLimit); } - return Document::fromBsonWithMetaData(next); + container->erase(next); + return std::next(itr); } -DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { - pExpCtx->checkForInterrupt(); - - if (_unstarted) - start(); +boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert( + 17026, "$mergeCursors stage expected array as argument", elem.type() == BSONType::Array); + const auto serializedRemotes = elem.Array(); + uassert(50729, + "$mergeCursors stage expected array with at least one entry", + serializedRemotes.size() > 0); + + boost::optional<NamespaceString> nss; + std::vector<ClusterClientCursorParams::RemoteCursor> remotes; + for (auto&& cursor : serializedRemotes) { + BSONElement nsElem; + BSONElement hostElem; + BSONElement idElem; + uassert(17027, + "$mergeCursors stage requires each cursor in array to be an object", + cursor.type() == BSONType::Object); + for (auto&& cursorElem : cursor.Obj()) { + const auto fieldName = cursorElem.fieldNameStringData(); + if (fieldName == "ns"_sd) { + nsElem = cursorElem; + } else if (fieldName == "host"_sd) { + hostElem = cursorElem; + } else if (fieldName == "id"_sd) { + idElem = cursorElem; + } else { + uasserted(50730, + str::stream() << "Unrecognized option " << fieldName + << " within cursor provided to $mergeCursors: " + << cursor); + } + } + uassert( + 50731, + "$mergeCursors stage requires \'ns\' field with type string for each cursor in array", + nsElem.type() == BSONType::String); + + // We require each cursor to have the same namespace. This isn't a fundamental limit of the + // system, but needs to be true due to the implementation of AsyncResultsMerger, which + // tracks one namespace for all cursors. + uassert(50720, + "$mergeCursors requires each cursor to have the same namespace", + !nss || nss->ns() == nsElem.valueStringData()); + nss = NamespaceString(nsElem.String()); - // purge eof cursors and release their connections - while (!_cursors.empty() && !(*_currentCursor)->cursor.more()) { - (*_currentCursor)->connection.done(); - _cursors.erase(_currentCursor); - _currentCursor = _cursors.begin(); + uassert( + 50721, + "$mergeCursors stage requires \'host\' field with type string for each cursor in array", + hostElem.type() == BSONType::String); + auto host = uassertStatusOK(HostAndPort::parse(hostElem.valueStringData())); + + uassert(50722, + "$mergeCursors stage requires \'id\' field with type long for each cursor in array", + idElem.type() == BSONType::NumberLong); + auto cursorId = idElem.Long(); + + // We are assuming that none of the cursors have been iterated at all, and so will not have + // any data in the initial batch. + // TODO SERVER-33323 We use a fake shard id because the AsyncResultsMerger won't use it for + // anything, and finding the real one is non-trivial. + std::vector<BSONObj> emptyBatch; + remotes.push_back({ShardId("fakeShardIdForMergeCursors"), + std::move(host), + CursorResponse{*nss, cursorId, emptyBatch}}); } + invariant(nss); // We know there is at least one cursor in 'serializedRemotes', and we require + // each cursor to have a 'ns' field. + + auto params = stdx::make_unique<ClusterClientCursorParams>(*nss); + params->remotes = std::move(remotes); + params->readPreference = ReadPreferenceSetting::get(expCtx->opCtx); + return new DocumentSourceMergeCursors( + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + std::move(params), + expCtx); +} - if (_cursors.empty()) - return GetNextResult::makeEOF(); - - auto next = nextSafeFrom(&((*_currentCursor)->cursor)); - - // advance _currentCursor, wrapping if needed - if (++_currentCursor == _cursors.end()) - _currentCursor = _cursors.begin(); +boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create( + std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors, + executor::TaskExecutor* executor, + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + auto params = stdx::make_unique<ClusterClientCursorParams>(expCtx->ns); + params->remotes = std::move(remoteCursors); + params->readPreference = ReadPreferenceSetting::get(expCtx->opCtx); + return new DocumentSourceMergeCursors(executor, std::move(params), expCtx); +} - return std::move(next); +void DocumentSourceMergeCursors::detachFromOperationContext() { + if (_arm) { + _arm->detachFromOperationContext(); + } } -bool DocumentSourceMergeCursors::remotesExhausted() const { - return std::all_of(_cursors.begin(), _cursors.end(), [](const auto& cursorAndConn) { - return cursorAndConn->cursor.isDead(); - }); +void DocumentSourceMergeCursors::reattachToOperationContext(OperationContext* opCtx) { + if (_arm) { + _arm->reattachToOperationContext(opCtx); + } } void DocumentSourceMergeCursors::doDispose() { - for (auto&& cursorAndConn : _cursors) { - // Note it is an error to call done() on a connection before consuming the reply from a - // request. - if (cursorAndConn->cursor.connectionHasPendingReplies()) { - continue; - } - cursorAndConn->cursor.kill(); - cursorAndConn->connection.done(); + if (_arm) { + _arm->blockingKill(pExpCtx->opCtx); } - _cursors.clear(); - _currentCursor = _cursors.end(); -} } + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 53ef546d5ea..e23772f411b 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016 MongoDB Inc. + * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, @@ -28,35 +28,58 @@ #pragma once -#include "mongo/client/connpool.h" -#include "mongo/db/clientcursor.h" -#include "mongo/db/cursor_id.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/query/async_results_merger.h" namespace mongo { +/** + * A stage used only internally to merge results that are being gathered from remote hosts, possibly + * including this host. + * + * Does not assume ownership of cursors until the first call to getNext(). This is to allow this + * stage to be used on mongos without actually iterating the cursors. For example, when this stage + * is parsed on mongos it may later be decided that the merging should happen on one of the shards. + * Then this stage is forwarded to the merging shard, and it should not kill the cursors when it + * goes out of scope on mongos. + */ class DocumentSourceMergeCursors : public DocumentSource { public: static constexpr StringData kStageName = "$mergeCursors"_sd; - struct CursorDescriptor { - CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId) - : connectionString(std::move(connectionString)), - ns(std::move(ns)), - cursorId(cursorId) {} - - ConnectionString connectionString; - std::string ns; - CursorId cursorId; - }; + /** + * Parses a serialized version of this stage. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement, const boost::intrusive_ptr<ExpressionContext>&); - GetNextResult getNext() final; + /** + * Creates a new DocumentSourceMergeCursors from the given 'remoteCursors'. + */ + static boost::intrusive_ptr<DocumentSource> create( + std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors, + executor::TaskExecutor*, + const boost::intrusive_ptr<ExpressionContext>&); const char* getSourceName() const final { return kStageName.rawData(); } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + /** + * Absorbs a subsequent $sort if it's merging pre-sorted streams. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container); + void detachFromOperationContext() final; + void reattachToOperationContext(OperationContext*) final; + + /** + * Serializes this stage to be sent to perform the merging on a different host. + */ + void serializeToArray( + std::vector<Value>& array, + boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { StageConstraints constraints(StreamType::kStreaming, @@ -69,57 +92,29 @@ public: return constraints; } - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - static boost::intrusive_ptr<DocumentSource> create( - std::vector<CursorDescriptor> cursorDescriptors, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - /** - * Returns true if all remotes have reported that their cursors are closed. - */ - bool remotesExhausted() const; - - /** Returns non-owning pointers to cursors managed by this stage. - * Call this instead of getNext() if you want access to the raw streams. - * This method should only be called at most once. - */ - std::vector<DBClientCursor*> getCursors(); - - /** - * Returns the next object from the cursor, throwing an appropriate exception if the cursor - * reported an error. This is a better form of DBClientCursor::nextSafe. - */ - static Document nextSafeFrom(DBClientCursor* cursor); + GetNextResult getNext() final; protected: void doDispose() final; private: - struct CursorAndConnection { - CursorAndConnection(const CursorDescriptor& cursorDescriptor); - ScopedDbConnection connection; - DBClientCursor cursor; - }; + DocumentSourceMergeCursors(executor::TaskExecutor*, + std::unique_ptr<ClusterClientCursorParams>, + const boost::intrusive_ptr<ExpressionContext>&); - // using list to enable removing arbitrary elements - typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors; - - DocumentSourceMergeCursors(std::vector<CursorDescriptor> cursorDescriptors, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - // Converts _cursorDescriptors into active _cursors. - void start(); - - // This is the description of cursors to merge. - const std::vector<CursorDescriptor> _cursorDescriptors; + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + MONGO_UNREACHABLE; // Should call serializeToArray instead. + } - // These are the actual cursors we are merging. Created lazily. - Cursors _cursors; - Cursors::iterator _currentCursor; + executor::TaskExecutor* _executor; - bool _unstarted; + // '_arm' is not populated until the first call to getNext(). If getNext() is never called we + // will not create an AsyncResultsMerger. If we did so the destruction of this stage would cause + // the cursors within the ARM to be killed prematurely. For example, if this stage is parsed on + // mongos then forwarded to the shards, it should not kill the cursors when it goes out of scope + // on mongos. + std::unique_ptr<ClusterClientCursorParams> _armParams; + boost::optional<AsyncResultsMerger> _arm; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp new file mode 100644 index 00000000000..8007aea0db0 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp @@ -0,0 +1,488 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_merge_cursors.h" + +#include "mongo/client/remote_command_targeter_factory_mock.h" +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/json.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/expression_context_for_test.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/getmore_request.h" +#include "mongo/db/query/query_request.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/task_executor.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/sharding_test_fixture.h" +#include "mongo/stdx/memory.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using executor::NetworkInterfaceMock; +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; + +using ResponseStatus = executor::TaskExecutor::ResponseStatus; + +const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); +const std::vector<ShardId> kTestShardIds = { + ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")}; +const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345), + HostAndPort("FakeShard2Host", 12345), + HostAndPort("FakeShard3Host", 12345)}; + +const NamespaceString kTestNss = NamespaceString("test.mergeCursors"_sd); +const HostAndPort kTestHost = HostAndPort("localhost:27017"_sd); + +const CursorId kExhaustedCursorID = 0; + +class DocumentSourceMergeCursorsTest : public ShardingTestFixture { +public: + DocumentSourceMergeCursorsTest() : _expCtx(new ExpressionContextForTest(kTestNss)) {} + + void setUp() override { + ShardingTestFixture::setUp(); + setRemote(HostAndPort("ClientHost", 12345)); + + configTargeter()->setFindHostReturnValue(kTestConfigShardHost); + + std::vector<ShardType> shards; + for (size_t i = 0; i < kTestShardIds.size(); i++) { + ShardType shardType; + shardType.setName(kTestShardIds[i].toString()); + shardType.setHost(kTestShardHosts[i].toString()); + + shards.push_back(shardType); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i])); + targeter->setFindHostReturnValue(kTestShardHosts[i]); + + targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]), + std::move(targeter)); + } + + setupShards(shards); + } + + boost::intrusive_ptr<ExpressionContextForTest> getExpCtx() { + return _expCtx.get(); + } + +private: + boost::intrusive_ptr<ExpressionContextForTest> _expCtx; +}; + +TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectNonArray) { + auto spec = BSON("$mergeCursors" << BSON( + "cursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" + << kTestHost.toString())))); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 17026); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectEmptyArray) { + auto spec = BSON("$mergeCursors" << BSONArray()); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 50729); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNoNamespace) { + auto spec = + BSON("$mergeCursors" << BSON_ARRAY(BSON("id" << 0LL << "host" << kTestHost.toString()))); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 50731); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNonStringNamespace) { + auto spec = BSON("$mergeCursors" << BSON_ARRAY( + BSON("ns" << 4 << "id" << 0LL << "host" << kTestHost.toString()))); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 50731); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorsWithDifferentNamespaces) { + auto spec = BSON( + "$mergeCursors" + << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" << kTestHost.toString()) + << BSON("ns" + << "test.other"_sd + << "id" + << 0LL + << "host" + << kTestHost.toString()))); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 50720); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNoHost) { + auto spec = BSON("$mergeCursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL))); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 50721); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNonStringHost) { + auto spec = BSON("$mergeCursors" + << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" << 4LL))); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 50721); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNonLongId) { + auto spec = BSON("$mergeCursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" + << "zero" + << "host" + << kTestHost.toString()))); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 50722); + spec = BSON("$mergeCursors" << BSON_ARRAY( + BSON("ns" << kTestNss.ns() << "id" << 0 << "host" << kTestHost.toString()))); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 50722); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithExtraField) { + auto spec = + BSON("$mergeCursors" << BSON_ARRAY(BSON( + "ns" << kTestNss.ns() << "id" << 0LL << "host" << kTestHost.toString() << "extra" + << "unexpected"))); + ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), + AssertionException, + 50730); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseTheSerializedVersionOfItself) { + auto spec = + BSON("$mergeCursors" << BSON_ARRAY( + BSON("ns" << kTestNss.ns() << "id" << 1LL << "host" << kTestHost.toString()) + << BSON("ns" << kTestNss.ns() << "id" << 2LL << "host" << kTestHost.toString()))); + auto mergeCursors = + DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()); + std::vector<Value> serializationArray; + mergeCursors->serializeToArray(serializationArray); + ASSERT_EQ(serializationArray.size(), 1UL); + // The serialized version might not be identical to 'spec', the fields might be in a different + // order, etc. Here we just make sure that the final parse doesn't throw. + auto newSpec = serializationArray[0].getDocument().toBson(); + ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx())); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) { + auto expCtx = getExpCtx(); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back( + kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {})); + cursors.emplace_back( + kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {})); + auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + + ASSERT_TRUE(mergeCursorsStage->getNext().isEOF()); +} + +BSONObj cursorResponseObj(const NamespaceString& nss, + CursorId cursorId, + std::vector<BSONObj> batch) { + return CursorResponse{nss, cursorId, std::move(batch)}.toBSON( + CursorResponse::ResponseType::SubsequentResponse); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) { + auto expCtx = getExpCtx(); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + pipeline->addInitialSource( + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); + + // Iterate the $mergeCursors stage asynchronously on a different thread, since it will block + // waiting for network responses, which we will manually schedule below. + auto future = launchAsync([&pipeline]() { + for (int i = 0; i < 5; ++i) { + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}})); + } + ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); + }); + + + // Schedule responses to two getMores which keep the cursor open. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return cursorResponseObj( + expCtx->ns, request.cmdObj["getMore"].Long(), {BSON("x" << 1), BSON("x" << 1)}); + }); + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return cursorResponseObj( + expCtx->ns, request.cmdObj["getMore"].Long(), {BSON("x" << 1), BSON("x" << 1)}); + }); + + // Schedule responses to two getMores which report the cursor is exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return cursorResponseObj(expCtx->ns, kExhaustedCursorID, {}); + }); + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return cursorResponseObj(expCtx->ns, kExhaustedCursorID, {BSON("x" << 1)}); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) { + auto expCtx = getExpCtx(); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + pipeline->addInitialSource( + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); + + pipeline.reset(); // Delete the pipeline before using it. + + network()->enterNetwork(); + ASSERT_FALSE(network()->hasReadyRequests()); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorsIfPartiallyIterated) { + auto expCtx = getExpCtx(); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + pipeline->addInitialSource( + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); + + // Iterate the pipeline asynchronously on a different thread, since it will block waiting for + // network responses, which we will manually schedule below. + auto future = launchAsync([&pipeline]() { + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}})); + pipeline.reset(); // Stop iterating and delete the pipeline. + }); + + // Note we do not use 'kExhaustedCursorID' here, so the cursors are still open. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return cursorResponseObj(expCtx->ns, 1, {BSON("x" << 1), BSON("x" << 1)}); + }); + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return cursorResponseObj(expCtx->ns, 2, {BSON("x" << 1), BSON("x" << 1)}); + }); + + // Here we're looking for the killCursors requests to be scheduled. + std::set<CursorId> killedCursors; + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["killCursors"]); + auto cursors = request.cmdObj["cursors"]; + ASSERT_EQ(cursors.type(), BSONType::Array); + auto cursorsArray = cursors.Array(); + ASSERT_FALSE(cursorsArray.empty()); + auto cursorId = cursorsArray[0].Long(); + ASSERT(cursorId == 1 || cursorId == 2); + killedCursors.insert(cursorId); + // The ARM doesn't actually inspect the response of the killCursors, so we don't have to put + // anything except {ok: 1}. + return BSON("ok" << 1); + }); + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["killCursors"]); + auto cursors = request.cmdObj["cursors"]; + ASSERT_EQ(cursors.type(), BSONType::Array); + auto cursorsArray = cursors.Array(); + ASSERT_FALSE(cursorsArray.empty()); + auto cursorId = cursorsArray[0].Long(); + ASSERT(cursorId == 1 || cursorId == 2); + killedCursors.insert(cursorId); + // The ARM doesn't actually inspect the response of the killCursors, so we don't have to put + // anything except {ok: 1}. + return BSON("ok" << 1); + }); + + future.timed_get(kFutureTimeout); + ASSERT_EQ(killedCursors.size(), 2UL); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrder) { + auto expCtx = getExpCtx(); + + // Make a pipeline with a single $sort stage that is merging pre-sorted results. + const bool mergingPresorted = true; + const long long noLimit = -1; + auto sortStage = DocumentSourceSort::create(expCtx, + BSON("x" << 1), + noLimit, + DocumentSourceSort::kMaxMemoryUsageBytes, + mergingPresorted); + auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); + + // Make a $mergeCursors stage and add it to the front of the pipeline. + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + pipeline->addInitialSource(std::move(mergeCursorsStage)); + + // After optimization we should only have a $mergeCursors stage. + pipeline->optimizePipeline(); + ASSERT_EQ(pipeline->getSources().size(), 1UL); + ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + + // Iterate the pipeline asynchronously on a different thread, since it will block waiting for + // network responses, which we will manually schedule below. + auto future = launchAsync([&pipeline]() { + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}})); + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 2}})); + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}})); + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}})); + ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); + }); + + onCommand([&](const auto& request) { + return cursorResponseObj(expCtx->ns, + kExhaustedCursorID, + {BSON("x" << 1 << "$sortKey" << BSON("" << 1)), + BSON("x" << 3 << "$sortKey" << BSON("" << 3))}); + }); + onCommand([&](const auto& request) { + return cursorResponseObj(expCtx->ns, + kExhaustedCursorID, + {BSON("x" << 2 << "$sortKey" << BSON("" << 2)), + BSON("x" << 4 << "$sortKey" << BSON("" << 4))}); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) { + auto expCtx = getExpCtx(); + + // Make a pipeline with a single $sort stage that is merging pre-sorted results. + const bool mergingPresorted = true; + const long long limit = 3; + auto sortStage = DocumentSourceSort::create( + expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted); + auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); + + // Make a $mergeCursors stage and add it to the front of the pipeline. + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + pipeline->addInitialSource(std::move(mergeCursorsStage)); + + // After optimization, we should still have a $limit stage. + pipeline->optimizePipeline(); + ASSERT_EQ(pipeline->getSources().size(), 2UL); + ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get())); + + // Iterate the pipeline asynchronously on a different thread, since it will block waiting for + // network responses, which we will manually schedule below. + auto future = launchAsync([&]() { + for (int i = 1; i <= limit; ++i) { + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}})); + } + ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); + }); + + onCommand([&](const auto& request) { + return cursorResponseObj(expCtx->ns, + kExhaustedCursorID, + {BSON("x" << 1 << "$sortKey" << BSON("" << 1)), + BSON("x" << 3 << "$sortKey" << BSON("" << 3))}); + }); + onCommand([&](const auto& request) { + return cursorResponseObj(expCtx->ns, + kExhaustedCursorID, + {BSON("x" << 2 << "$sortKey" << BSON("" << 2)), + BSON("x" << 4 << "$sortKey" << BSON("" << 4))}); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) { + auto expCtx = getExpCtx(); + + // Make a pipeline with a single $sort stage that is merging pre-sorted results. + const bool mergingPresorted = true; + const long long limit = 3; + auto sortStage = DocumentSourceSort::create( + expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted); + auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); + + // Make a $mergeCursors stage and add it to the front of the pipeline. + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + pipeline->addInitialSource(std::move(mergeCursorsStage)); + + // After optimization, we should still have a $limit stage. + pipeline->optimizePipeline(); + ASSERT_EQ(pipeline->getSources().size(), 2UL); + ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get())); + + auto serialized = pipeline->serialize(); + ASSERT_EQ(serialized.size(), 3UL); + ASSERT_FALSE(serialized[0]["$mergeCursors"].missing()); + ASSERT_FALSE(serialized[1]["$sort"].missing()); + ASSERT_FALSE(serialized[2]["$limit"].missing()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 2993bd1286d..38e72693d1d 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -106,6 +106,8 @@ REGISTER_DOCUMENT_SOURCE(sort, DocumentSource::GetNextResult DocumentSourceSort::getNext() { pExpCtx->checkForInterrupt(); + invariant(!_mergingPresorted); // A presorted-merge should be optimized into the merge, and + // never executed. if (!_populated) { const auto populationResult = populate(); @@ -330,24 +332,14 @@ SortOptions DocumentSourceSort::makeSortOptions() const { } DocumentSource::GetNextResult DocumentSourceSort::populate() { - if (_mergingPresorted) { - typedef DocumentSourceMergeCursors DSCursors; - if (DSCursors* castedSource = dynamic_cast<DSCursors*>(pSource)) { - populateFromCursors(castedSource->getCursors()); - } else { - msgasserted(17196, "can only mergePresorted from MergeCursors"); - } - return DocumentSource::GetNextResult::makeEOF(); - } else { - auto nextInput = pSource->getNext(); - for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { - loadDocument(nextInput.releaseDocument()); - } - if (nextInput.isEOF()) { - loadingDone(); - } - return nextInput; + auto nextInput = pSource->getNext(); + for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { + loadDocument(nextInput.releaseDocument()); } + if (nextInput.isEOF()) { + loadingDone(); + } + return nextInput; } void DocumentSourceSort::loadDocument(Document&& doc) { @@ -374,44 +366,6 @@ void DocumentSourceSort::loadingDone() { _populated = true; } -class DocumentSourceSort::IteratorFromCursor : public MySorter::Iterator { -public: - IteratorFromCursor(DocumentSourceSort* sorter, DBClientCursor* cursor) - : _sorter(sorter), _cursor(cursor) {} - - bool more() { - return _cursor->more(); - } - Data next() { - auto doc = DocumentSourceMergeCursors::nextSafeFrom(_cursor); - if (doc.hasSortKeyMetaField()) { - // We set the sort key metadata field during the first half of the sort, so just use - // that as the sort key here. - return make_pair( - deserializeSortKey(_sorter->_sortPattern.size(), doc.getSortKeyMetaField()), doc); - } else { - // It's possible this result is coming from a shard that is still on an old version. If - // that's the case, it won't tell us it's sort key - we'll have to re-compute it - // ourselves. - return _sorter->extractSortKey(std::move(doc)); - } - } - -private: - DocumentSourceSort* _sorter; - DBClientCursor* _cursor; -}; - -void DocumentSourceSort::populateFromCursors(const vector<DBClientCursor*>& cursors) { - vector<std::shared_ptr<MySorter::Iterator>> iterators; - for (size_t i = 0; i < cursors.size(); i++) { - iterators.push_back(std::make_shared<IteratorFromCursor>(this, cursors[i])); - } - - _output.reset(MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))); - _populated = true; -} - Value DocumentSourceSort::getCollationComparisonKey(const Value& val) const { const auto collator = pExpCtx->getCollator(); diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index b1aeb09cd43..a9b2ed14c2f 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -155,9 +155,6 @@ protected: void doDispose() final; private: - // This is used to merge pre-sorted results from a DocumentSourceMergeCursors. - class IteratorFromCursor; - using MySorter = Sorter<Value, Document>; // For MySorter. diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 499d8c3d6e5..0eb286bca28 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -142,7 +142,7 @@ void Pipeline::validatePipeline() const { uasserted(ErrorCodes::InvalidNamespace, "{aggregate: 1} is not valid for an empty pipeline."); } - } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) { + } else if ("$mergeCursors"_sd != _sources.front()->getSourceName()) { // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, // {aggregate: 1} is only valid for collectionless sources, and vice-versa. const auto firstStageConstraints = _sources.front()->constraints(_splitState); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 28274b3e1ff..d3779fa6f86 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1231,6 +1231,7 @@ env.Library( '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/s/query/async_results_merger', + '$BUILD_DIR/mongo/s/query/cluster_client_cursor', '$BUILD_DIR/mongo/util/progress_meter', ], ) diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 2550179e3de..e52f81efc77 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -40,6 +40,7 @@ #include "mongo/db/commands.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" @@ -487,27 +488,19 @@ DispatchShardPipelineResults dispatchShardPipeline( targetedCommand}; } -std::pair<ShardId, Shard::CommandResponse> establishMergingShardCursor( - OperationContext* opCtx, - const NamespaceString& nss, - const std::vector<ClusterClientCursorParams::RemoteCursor>& cursors, - const BSONObj mergeCmdObj, - const boost::optional<ShardId> primaryShard) { - // Run merging command on random shard, unless we need to run on the primary shard. - auto& prng = opCtx->getClient()->getPrng(); - const auto mergingShardId = - primaryShard ? primaryShard.get() : cursors[prng.nextInt32(cursors.size())].shardId; +Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj mergeCmdObj, + const ShardId& mergingShardId) { const auto mergingShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); - auto shardCmdResponse = uassertStatusOK( + return uassertStatusOK( mergingShard->runCommandWithFixedRetryAttempts(opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, Shard::RetryPolicy::kIdempotent)); - - return {std::move(mergingShardId), std::move(shardCmdResponse)}; } BSONObj establishMergingMongosCursor(OperationContext* opCtx, @@ -634,6 +627,19 @@ BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard, return defaultCollation; } +ShardId pickMergingShard(OperationContext* opCtx, + const DispatchShardPipelineResults& dispatchResults, + ShardId primaryShard) { + auto& prng = opCtx->getClient()->getPrng(); + // If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging + // command on random shard, unless the pipeline dictates that it needs to be run on the primary + // shard for the database. + return dispatchResults.needsPrimaryShardMerge + ? primaryShard + : dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())] + .shardId; +} + } // namespace Status ClusterAggregate::runAggregate(OperationContext* opCtx, @@ -811,29 +817,25 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return getStatusFromCommandResult(result->asTempObj()); } - // If we cannot merge on mongoS, establish the merge cursor on a shard. - mergingPipeline->addInitialSource( - DocumentSourceMergeCursors::create(parseCursors(dispatchResults.remoteCursors), mergeCtx)); + ShardId mergingShardId = + pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.primaryId()); + + mergingPipeline->addInitialSource(DocumentSourceMergeCursors::create( + std::move(dispatchResults.remoteCursors), + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + mergeCtx)); auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline); auto mergeResponse = - establishMergingShardCursor(opCtx, - namespaces.executionNss, - dispatchResults.remoteCursors, - mergeCmdObj, - boost::optional<ShardId>{dispatchResults.needsPrimaryShardMerge, - executionNsRoutingInfo.primaryId()}); - - auto mergingShardId = mergeResponse.first; - auto response = mergeResponse.second; + establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId); // The merging shard is remote, so if a response was received, a HostAndPort must have been set. - invariant(response.hostAndPort); + invariant(mergeResponse.hostAndPort); auto mergeCursorResponse = uassertStatusOK( storePossibleCursor(opCtx, mergingShardId, - *response.hostAndPort, - response.response, + *mergeResponse.hostAndPort, + mergeResponse.response, namespaces.requestedNss, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), Grid::get(opCtx)->getCursorManager())); @@ -841,19 +843,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result); } -std::vector<DocumentSourceMergeCursors::CursorDescriptor> ClusterAggregate::parseCursors( - const std::vector<ClusterClientCursorParams::RemoteCursor>& responses) { - std::vector<DocumentSourceMergeCursors::CursorDescriptor> cursors; - for (const auto& response : responses) { - invariant(0 != response.cursorResponse.getCursorId()); - invariant(response.cursorResponse.getBatch().empty()); - cursors.emplace_back(ConnectionString(response.hostAndPort), - response.cursorResponse.getNSS().toString(), - response.cursorResponse.getCursorId()); - } - return cursors; -} - void ClusterAggregate::uassertAllShardsSupportExplain( const std::vector<AsyncRequestsSender::Response>& shardResults) { for (const auto& result : shardResults) { diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h index 740cc62c610..d7c0071e3de 100644 --- a/src/mongo/s/commands/cluster_aggregate.h +++ b/src/mongo/s/commands/cluster_aggregate.h @@ -81,9 +81,6 @@ public: BSONObjBuilder* result); private: - static std::vector<DocumentSourceMergeCursors::CursorDescriptor> parseCursors( - const std::vector<ClusterClientCursorParams::RemoteCursor>& cursors); - static void uassertAllShardsSupportExplain( const std::vector<AsyncRequestsSender::Response>& shardResults); diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 43d2fc5c7e5..0c90f644c5e 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -25,6 +25,7 @@ env.Library( "cluster_client_cursor_impl.cpp", ], LIBDEPS=[ + "$BUILD_DIR/mongo/db/pipeline/pipeline", "router_exec_stage", ], ) @@ -46,7 +47,7 @@ env.Library( "async_results_merger", ], LIBDEPS_PRIVATE=[ - "$BUILD_DIR/mongo/db/pipeline/document_source_lookup", + "$BUILD_DIR/mongo/db/pipeline/pipeline", ], ) @@ -68,7 +69,6 @@ env.Library( target="async_results_merger", source=[ "async_results_merger.cpp", - "cluster_client_cursor_params.cpp", "establish_cursors.cpp", ], LIBDEPS=[ @@ -76,10 +76,7 @@ env.Library( "$BUILD_DIR/mongo/executor/task_executor_interface", "$BUILD_DIR/mongo/s/async_requests_sender", "$BUILD_DIR/mongo/s/client/sharding_client", - "$BUILD_DIR/mongo/db/pipeline/pipeline", - ], - LIBDEPS_PRIVATE=[ - "$BUILD_DIR/mongo/db/pipeline/document_source_lookup", + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', ], ) @@ -100,6 +97,7 @@ env.CppUnitTest( ], LIBDEPS=[ 'async_results_merger', + 'cluster_client_cursor', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/query/query_request', '$BUILD_DIR/mongo/db/service_context_noop_init', diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 8f80a4b64ab..e315452160f 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -34,7 +34,7 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/client/remote_command_targeter.h" -#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/killcursors_request.h" @@ -44,6 +44,10 @@ #include "mongo/util/log.h" namespace mongo { + +constexpr StringData AsyncResultsMerger::kSortKeyField; +const BSONObj AsyncResultsMerger::kWholeSortKeySortPattern = BSON(kSortKeyField << 1); + namespace { // Maximum number of retries for network and replication notMaster errors (per host). @@ -54,7 +58,7 @@ const int kMaxNumFailedHostRetryAttempts = 3; * {'': 'firstSortKey', '': 'secondSortKey', ...}. */ BSONObj extractSortKey(BSONObj obj, bool compareWholeSortKey) { - auto key = obj[ClusterClientCursorParams::kSortKeyField]; + auto key = obj[AsyncResultsMerger::kSortKeyField]; invariant(key); if (compareWholeSortKey) { return key.wrap(); @@ -451,8 +455,8 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, remote->cursorId = response.getCursorId(); if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) { // We only expect to see this for change streams. - invariant(SimpleBSONObjComparator::kInstance.evaluate( - _params->sort == DocumentSourceChangeStream::kSortSpec)); + invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort == + change_stream_constants::kSortSpec)); auto newLatestTimestamp = *response.getLastOplogTimestamp(); if (remote->promisedMinSortKey) { @@ -482,7 +486,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, remote->promisedMinSortKey = (compareSortKeys( - newPromisedMin, maxSortKeyFromResponse, DocumentSourceChangeStream::kSortSpec) < 0 + newPromisedMin, maxSortKeyFromResponse, change_stream_constants::kSortSpec) < 0 ? maxSortKeyFromResponse.getOwned() : newPromisedMin.getOwned()); } @@ -586,18 +590,18 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, for (const auto& obj : response.getBatch()) { // If there's a sort, we're expecting the remote node to have given us back a sort key. if (!_params->sort.isEmpty()) { - auto key = obj[ClusterClientCursorParams::kSortKeyField]; + auto key = obj[AsyncResultsMerger::kSortKeyField]; if (!key) { - remote.status = Status(ErrorCodes::InternalError, - str::stream() << "Missing field '" - << ClusterClientCursorParams::kSortKeyField - << "' in document: " - << obj); + remote.status = + Status(ErrorCodes::InternalError, + str::stream() << "Missing field '" << AsyncResultsMerger::kSortKeyField + << "' in document: " + << obj); return false; } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) { remote.status = Status(ErrorCodes::InternalError, - str::stream() << "Field '" << ClusterClientCursorParams::kSortKeyField + str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField << "' was not of type Object in document: " << obj); return false; @@ -732,4 +736,36 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const _sort) > 0; } +void AsyncResultsMerger::blockingKill(OperationContext* opCtx) { + auto killEvent = kill(opCtx); + if (!killEvent) { + // We are shutting down. + return; + } + _executor->waitForEvent(killEvent); +} + +StatusWith<ClusterQueryResult> AsyncResultsMerger::blockingNext() { + while (!ready()) { + auto nextEventStatus = nextEvent(); + if (!nextEventStatus.isOK()) { + return nextEventStatus.getStatus(); + } + auto event = nextEventStatus.getValue(); + + // Block until there are further results to return. + auto status = _executor->waitForEvent(_opCtx, event); + + if (!status.isOK()) { + return status.getStatus(); + } + + // We have not provided a deadline, so if the wait returns without interruption, we do not + // expect to have timed out. + invariant(status.getValue() == stdx::cv_status::no_timeout); + } + + return nextReady(); +} + } // namespace mongo diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 0a441016060..b22584e9c27 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -75,6 +75,13 @@ class AsyncResultsMerger { MONGO_DISALLOW_COPYING(AsyncResultsMerger); public: + // When mongos has to do a merge in order to return results to the client in the correct sort + // order, it requests a sortKey meta-projection using this field name. + static constexpr StringData kSortKeyField = "$sortKey"_sd; + + // The expected sort key pattern when 'compareWholeSortKey' is true. + static const BSONObj kWholeSortKeySortPattern; + /** * Takes ownership of the cursors from ClusterClientCursorParams by storing their cursorIds and * the hosts on which they exist in _remotes. @@ -160,6 +167,12 @@ public: StatusWith<ClusterQueryResult> nextReady(); /** + * Blocks until the next result is ready, all remote cursors are exhausted, or there is an + * error. + */ + StatusWith<ClusterQueryResult> blockingNext(); + + /** * Schedules remote work as required in order to make further results available. If there is an * error in scheduling this work, returns a non-ok status. On success, returns an event handle. * The caller can pass this event handle to 'executor' in order to be blocked until further @@ -203,6 +216,11 @@ public: */ executor::TaskExecutor::EventHandle kill(OperationContext* opCtx); + /** + * A blocking version of kill() that will not return until this is safe to destroy. + */ + void blockingKill(OperationContext*); + private: /** * We instantiate one of these per remote host. It contains the buffer of results we've diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index ccbce1d5517..34e70fc8333 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -1880,6 +1880,72 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin executor()->waitForEvent(killEvent); } -} // namespace +TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); + + // Before any requests are scheduled, ARM is not ready to return results. + ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto next = unittest::assertGet(arm->blockingNext()); + ASSERT_FALSE(next.isEOF()); + ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1)); + next = unittest::assertGet(arm->blockingNext()); + ASSERT_TRUE(next.isEOF()); + }); + + // Schedule the response to the getMore which will return the next result and mark the cursor as + // exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return CursorResponse(_nss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) { + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto nextStatus = arm->blockingNext(); + ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); + }); + + // Now mark the OperationContext as killed from this thread. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->markKilled(ErrorCodes::Interrupted); + } + future.timed_get(kFutureTimeout); + // Be careful not to use a blocking kill here, since the main thread is in charge of running the + // callbacks, and we'd block on ourselves. + auto killEvent = arm->kill(operationContext()); + + assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1); + runReadyCallbacks(); + executor()->waitForEvent(killEvent); +} + +TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) { + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); + + // Before any requests are scheduled, ARM is not ready to return results. + ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + + arm->blockingKill(operationContext()); +} + +} // namespace } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index e555f1d32e8..10152522f59 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -79,7 +79,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, _opCtx(opCtx) { dassert(!_params.compareWholeSortKey || SimpleBSONObjComparator::kInstance.evaluate( - _params.sort == ClusterClientCursorParams::kWholeSortKeySortPattern)); + _params.sort == AsyncResultsMerger::kWholeSortKeySortPattern)); } ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, @@ -89,7 +89,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, : _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) { dassert(!_params.compareWholeSortKey || SimpleBSONObjComparator::kInstance.evaluate( - _params.sort == ClusterClientCursorParams::kWholeSortKeySortPattern)); + _params.sort == AsyncResultsMerger::kWholeSortKeySortPattern)); } StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next( @@ -281,9 +281,7 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( if (hasSort) { // Strip out the sort key after sorting. root = stdx::make_unique<RouterStageRemoveMetadataFields>( - opCtx, - std::move(root), - std::vector<StringData>{ClusterClientCursorParams::kSortKeyField}); + opCtx, std::move(root), std::vector<StringData>{AsyncResultsMerger::kSortKeyField}); } return root; diff --git a/src/mongo/s/query/cluster_client_cursor_params.cpp b/src/mongo/s/query/cluster_client_cursor_params.cpp deleted file mode 100644 index 4a033135c2a..00000000000 --- a/src/mongo/s/query/cluster_client_cursor_params.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/cluster_client_cursor_params.h" - -namespace mongo { - -const char ClusterClientCursorParams::kSortKeyField[] = "$sortKey"; -const BSONObj ClusterClientCursorParams::kWholeSortKeySortPattern = BSON(kSortKeyField << 1); - -} // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 3bc5da6c9c0..d213b0ea73f 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -61,13 +61,6 @@ class RouterExecStage; * this cursor have been processed. */ struct ClusterClientCursorParams { - // When mongos has to do a merge in order to return results to the client in the correct sort - // order, it requests a sortKey meta-projection using this field name. - static const char kSortKeyField[]; - - // The expected sort key pattern when 'compareWholeSortKey' is true. - static const BSONObj kWholeSortKeySortPattern; - struct RemoteCursor { RemoteCursor(ShardId shardId, HostAndPort hostAndPort, CursorResponse cursorResponse) : shardId(std::move(shardId)), diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 91c58187615..acab84e033d 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -50,6 +50,7 @@ #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/query/async_results_merger.h" #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/establish_cursors.h" @@ -135,7 +136,7 @@ StatusWith<std::unique_ptr<QueryRequest>> transformQueryForShards( if (!qr.getSort().isEmpty() && !qr.getSort()["$natural"]) { BSONObjBuilder projectionBuilder; projectionBuilder.appendElements(qr.getProj()); - projectionBuilder.append(ClusterClientCursorParams::kSortKeyField, kSortKeyMetaProjection); + projectionBuilder.append(AsyncResultsMerger::kSortKeyField, kSortKeyMetaProjection); newProjection = projectionBuilder.obj(); } @@ -143,8 +144,7 @@ StatusWith<std::unique_ptr<QueryRequest>> transformQueryForShards( invariant(qr.getSort().isEmpty()); BSONObjBuilder projectionBuilder; projectionBuilder.appendElements(qr.getProj()); - projectionBuilder.append(ClusterClientCursorParams::kSortKeyField, - kGeoNearDistanceMetaProjection); + projectionBuilder.append(AsyncResultsMerger::kSortKeyField, kGeoNearDistanceMetaProjection); newProjection = projectionBuilder.obj(); } @@ -218,7 +218,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // by the geoNearDistance. Request the projection {$sortKey: <geoNearDistance>} from the // shards. Indicate to the AsyncResultsMerger that it should extract the sort key // {"$sortKey": <geoNearDistance>} and sort by the order {"$sortKey": 1}. - params.sort = ClusterClientCursorParams::kWholeSortKeySortPattern; + params.sort = AsyncResultsMerger::kWholeSortKeySortPattern; params.compareWholeSortKey = true; appendGeoNearDistanceProjection = true; } @@ -337,10 +337,10 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx, invariant(results); // Projection on the reserved sort key field is illegal in mongos. - if (query.getQueryRequest().getProj().hasField(ClusterClientCursorParams::kSortKeyField)) { + if (query.getQueryRequest().getProj().hasField(AsyncResultsMerger::kSortKeyField)) { uasserted(ErrorCodes::BadValue, str::stream() << "Projection contains illegal field '" - << ClusterClientCursorParams::kSortKeyField + << AsyncResultsMerger::kSortKeyField << "': " << query.getQueryRequest().getProj()); } diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 8ff0dfc991a..4f17927483b 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -47,32 +47,7 @@ StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) { // cursors wait for ready() only until a specified time limit is exceeded. return (_params->tailableMode == TailableMode::kTailableAndAwaitData ? awaitNextWithTimeout(execCtx) - : blockForNextNoTimeout(execCtx)); -} - -StatusWith<ClusterQueryResult> RouterStageMerge::blockForNextNoTimeout(ExecContext execCtx) { - invariant(_params->tailableMode != TailableMode::kTailableAndAwaitData); - invariant(getOpCtx()); - while (!_arm.ready()) { - auto nextEventStatus = _arm.nextEvent(); - if (!nextEventStatus.isOK()) { - return nextEventStatus.getStatus(); - } - auto event = nextEventStatus.getValue(); - - // Block until there are further results to return. - auto status = _executor->waitForEvent(getOpCtx(), event); - - if (!status.isOK()) { - return status.getStatus(); - } - - // We have not provided a deadline, so if the wait returns without interruption, we do not - // expect to have timed out. - invariant(status.getValue() == stdx::cv_status::no_timeout); - } - - return _arm.nextReady(); + : _arm.blockingNext()); } StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) { @@ -120,12 +95,7 @@ StatusWith<EventHandle> RouterStageMerge::getNextEvent() { } void RouterStageMerge::kill(OperationContext* opCtx) { - auto killEvent = _arm.kill(opCtx); - if (!killEvent) { - // Mongos is shutting down. - return; - } - _executor->waitForEvent(killEvent); + _arm.blockingKill(opCtx); } bool RouterStageMerge::remotesExhausted() { diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index bb7d6ed81a7..efd397b8c7e 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -75,12 +75,6 @@ protected: private: /** - * Returns the next document received by the ARM, blocking indefinitely until we either have a - * new result or exhaust the remote cursors. - */ - StatusWith<ClusterQueryResult> blockForNextNoTimeout(ExecContext execCtx); - - /** * Awaits the next result from the ARM up to a specified time limit. If this is the user's * initial find or we have already obtained at least one result for this batch, this method * returns EOF immediately rather than blocking. |