diff options
author | Ted Tuckman <ted.tuckman@mongodb.com> | 2022-03-02 22:20:32 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-02 23:11:24 +0000 |
commit | f25675cc8ea9d89672ce063f49dbdaa39e63ce1b (patch) | |
tree | b1867e1b1e3db4dc5083da673bd62aa4cc218767 | |
parent | 27ce39ba637159ae0be6e7734b1d7f114af7141c (diff) | |
download | mongo-f25675cc8ea9d89672ce063f49dbdaa39e63ce1b.tar.gz |
SERVER-62535 Allow sharded aggregation to return two cursors
34 files changed, 387 insertions, 21 deletions
diff --git a/buildscripts/idl/idl/ast.py b/buildscripts/idl/idl/ast.py index 1098cb606d6..5e6958a17c6 100644 --- a/buildscripts/idl/idl/ast.py +++ b/buildscripts/idl/idl/ast.py @@ -143,6 +143,7 @@ class Struct(common.SourceLocation): self.fields = [] # type: List[Field] self.allow_global_collection_name = False # type: bool self.non_const_getter = False # type: bool + self.cpp_validator_func = None # type: str super(Struct, self).__init__(file_name, line, column) diff --git a/buildscripts/idl/idl/binder.py b/buildscripts/idl/idl/binder.py index 0ef5eb966e8..14a4c23a3bb 100644 --- a/buildscripts/idl/idl/binder.py +++ b/buildscripts/idl/idl/binder.py @@ -266,6 +266,7 @@ def _bind_struct_common(ctxt, parsed_spec, struct, ast_struct): ast_struct.immutable = struct.immutable ast_struct.inline_chained_structs = struct.inline_chained_structs ast_struct.generate_comparison_operators = struct.generate_comparison_operators + ast_struct.cpp_validator_func = struct.cpp_validator_func ast_struct.cpp_name = struct.cpp_name or struct.name ast_struct.qualified_cpp_name = _get_struct_qualified_cpp_name(struct) ast_struct.allow_global_collection_name = struct.allow_global_collection_name diff --git a/buildscripts/idl/idl/generator.py b/buildscripts/idl/idl/generator.py index ff9da001069..08a3caecaf8 100644 --- a/buildscripts/idl/idl/generator.py +++ b/buildscripts/idl/idl/generator.py @@ -1784,6 +1784,9 @@ class _CppSourceFileWriter(_CppFileWriterBase): field_usage_check.add_final_checks() self._writer.write_empty_line() + if struct.cpp_validator_func is not None: + self._writer.write_line(struct.cpp_validator_func + "(this);") + self._gen_command_deserializer(struct, "bsonObject") def gen_op_msg_request_deserializer_methods(self, struct): diff --git a/buildscripts/idl/idl/parser.py b/buildscripts/idl/idl/parser.py index d624b4f4473..f7a133783fd 100644 --- a/buildscripts/idl/idl/parser.py +++ b/buildscripts/idl/idl/parser.py @@ -526,6 +526,7 @@ def _parse_struct(ctxt, spec, name, node): "immutable": _RuleDesc('bool_scalar'), "generate_comparison_operators": _RuleDesc("bool_scalar"), "non_const_getter": _RuleDesc('bool_scalar'), + "cpp_validator_func": _RuleDesc('scalar'), }) # PyLint has difficulty with some iterables: https://github.com/PyCQA/pylint/issues/3105 diff --git a/buildscripts/idl/idl/syntax.py b/buildscripts/idl/idl/syntax.py index 69be6288cbe..89af355cca4 100644 --- a/buildscripts/idl/idl/syntax.py +++ b/buildscripts/idl/idl/syntax.py @@ -529,6 +529,7 @@ class Struct(common.SourceLocation): self.fields = None # type: List[Field] self.allow_global_collection_name = False # type: bool self.non_const_getter = False # type: bool + self.cpp_validator_func = None # type: str # Command only property self.cpp_name = None # type: str diff --git a/buildscripts/idl/idl_check_compatibility.py b/buildscripts/idl/idl_check_compatibility.py index 256339ca07c..0cb6a7e25f4 100644 --- a/buildscripts/idl/idl_check_compatibility.py +++ b/buildscripts/idl/idl_check_compatibility.py @@ -166,6 +166,14 @@ IGNORE_UNSTABLE_LIST: List[str] = [ # visible. This is part of the listIndexes output when executed against system.bucket.* # collections, which users should avoid doing. 'listIndexes-reply-originalSpec', + # The 'vars' field was introduced to facilitate communication between mongot and mongod and is + # not user visible. + 'find-reply-vars', + 'aggregate-reply-vars', + # The 'cursor' field is now optional in a reply, as inter-node communication in aggregation + # can return one or more cursors. Multiple cursors are covered under the 'cursors' field. + 'find-reply-cursor', + 'aggregate-reply-cursor', ] SKIPPED_FILES = ["unittest.idl"] @@ -499,8 +507,8 @@ def check_reply_field(ctxt: IDLCompatibilityContext, old_field: syntax.Field, and old_field_type.name == "optionalBool") new_field_optional = new_field.optional or (new_field_type and new_field_type.name == "optionalBool") - if not old_field.unstable: - field_name: str = cmd_name + "-reply-" + new_field.name + field_name: str = cmd_name + "-reply-" + new_field.name + if not old_field.unstable and field_name not in IGNORE_UNSTABLE_LIST: if new_field.unstable and field_name not in IGNORE_UNSTABLE_LIST: ctxt.add_new_reply_field_unstable_error(cmd_name, new_field.name, new_idl_file_path) if new_field_optional and not old_field_optional: diff --git a/buildscripts/idl/tests/test_parser.py b/buildscripts/idl/tests/test_parser.py index 1439d58c383..8fc760caf13 100644 --- a/buildscripts/idl/tests/test_parser.py +++ b/buildscripts/idl/tests/test_parser.py @@ -286,6 +286,7 @@ class TestParser(testcase.IDLTestcase): immutable: true inline_chained_structs: true generate_comparison_operators: true + cpp_validator_func: funcName fields: foo: bar """)) @@ -300,6 +301,7 @@ class TestParser(testcase.IDLTestcase): immutable: false inline_chained_structs: false generate_comparison_operators: false + cpp_validator_func: funcName fields: foo: bar """)) diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 732233dd194..2de5d93b52f 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -591,7 +591,8 @@ public: const CursorId cursorId = 0; endQueryOp(opCtx, collection, *exec, numResults, cursorId); auto bodyBuilder = result->getBodyBuilder(); - appendCursorResponseObject(cursorId, nss.ns(), BSONArray(), &bodyBuilder); + appendCursorResponseObject( + cursorId, nss.ns(), BSONArray(), boost::none, &bodyBuilder); return; } diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 813ad0e0a3e..64907d4d7d3 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -141,8 +141,11 @@ bool handleCursorCommand(OperationContext* opCtx, invariant(cursors[idx]); BSONObjBuilder cursorResult; - appendCursorResponseObject( - cursors[idx]->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult); + appendCursorResponseObject(cursors[idx]->cursorid(), + nsForCursor.ns(), + BSONArray(), + cursors[idx]->getExecutor()->getExecutorType(), + &cursorResult); cursorResult.appendBool("ok", 1); cursorsBuilder.append(cursorResult.obj()); @@ -514,6 +517,35 @@ std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> createExchangePipelinesI } /** + * Creates additional pipelines if needed to serve the aggregation. This includes additional + * pipelines for exchange optimization and search commands that generate metadata. Returns + * a vector of all pipelines needed for the query, including the original one. + * + * Takes ownership of the original, passed in, pipeline. + */ +std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> createAdditionalPipelinesIfNeeded( + OperationContext* opCtx, + boost::intrusive_ptr<ExpressionContext> expCtx, + const AggregateCommandRequest& request, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + boost::optional<UUID> collUUID) { + + std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> pipelines; + // Exchange is not allowed to be specified if there is a $search stage. + if (auto metadataPipe = getSearchHelpers(opCtx->getServiceContext()) + ->generateMetadataPipelineForSearch( + opCtx, expCtx, request, pipeline.get(), collUUID)) { + pipelines.push_back(std::move(pipeline)); + pipelines.push_back(std::move(metadataPipe)); + } else { + // Takes ownership of 'pipeline'. + pipelines = + createExchangePipelinesIfNeeded(opCtx, expCtx, request, std::move(pipeline), collUUID); + } + return pipelines; +} + +/** * Performs validations related to API versioning and time-series stages. * Throws UserAssertion if any of the validations fails * - validation of API versioning on each stage on the pipeline @@ -574,7 +606,7 @@ std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createLegacyEx std::move(attachExecutorCallback.second), pipeline.get()); - auto pipelines = createExchangePipelinesIfNeeded( + auto pipelines = createAdditionalPipelinesIfNeeded( expCtx->opCtx, expCtx, request, std::move(pipeline), expCtx->uuid); for (auto&& pipelineIt : pipelines) { // There are separate ExpressionContexts for each exchange pipeline, so make sure to diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp index bba9d728375..de2d5131932 100644 --- a/src/mongo/db/exec/sbe_cmd.cpp +++ b/src/mongo/db/exec/sbe_cmd.cpp @@ -148,7 +148,7 @@ public: } if (exec->isEOF()) { - appendCursorResponseObject(0LL, nss.ns(), firstBatch.arr(), &result); + appendCursorResponseObject(0LL, nss.ns(), firstBatch.arr(), boost::none, &result); return true; } @@ -167,7 +167,7 @@ public: {}}); appendCursorResponseObject( - pinnedCursor.getCursor()->cursorid(), nss.ns(), firstBatch.arr(), &result); + pinnedCursor.getCursor()->cursorid(), nss.ns(), firstBatch.arr(), boost::none, &result); return true; } diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index aa47aef4bb0..c87ca855fd8 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -354,6 +354,7 @@ pipelineEnv.Library( '$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source', '$BUILD_DIR/mongo/db/query/collation/collator_factory_interface', '$BUILD_DIR/mongo/db/query/collation/collator_interface', + '$BUILD_DIR/mongo/db/query/cursor_response_idl', '$BUILD_DIR/mongo/db/query/datetime/date_time_support', '$BUILD_DIR/mongo/db/query/query_knobs', '$BUILD_DIR/mongo/db/query/sort_pattern', diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index e1ef7b6a0a4..6cdf67181b9 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -41,6 +41,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h" +#include "mongo/db/query/cursor_response_gen.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/executor/task_executor.h" @@ -382,6 +383,21 @@ public: */ friend class PipelineD; + /** + * For commands that return multiple pipelines, this value will contain the type of pipeline. + * This can be populated to the cursor so consumers do not have to depend on order or guess + * which pipeline is which. Default to a regular result pipeline. + */ + CursorTypeEnum pipelineType = CursorTypeEnum::DocumentResult; + + /** + * Get a string representation of the pipeline type. + */ + auto getTypeString() { + return CursorType_serializer(pipelineType); + } + + private: friend class PipelineDeleter; @@ -450,4 +466,15 @@ private: bool _dismissed = false; }; +/** + * A 'ServiceContext' decorator that by default does nothing but can be set to generate a + * complimentary, metadata pipeline to the one passed in. + */ +extern ServiceContext::Decoration<std::unique_ptr<Pipeline, PipelineDeleter> (*)( + OperationContext* opCtx, + boost::intrusive_ptr<ExpressionContext> expCtx, + const AggregateCommandRequest& request, + Pipeline* origPipeline, + boost::optional<UUID> uuid)> + generateMetadataPipelineFunc; } // namespace mongo diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h index efef0a7bb2d..d320a63062a 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.h +++ b/src/mongo/db/pipeline/plan_executor_pipeline.h @@ -163,6 +163,11 @@ public: return false; } + boost::optional<StringData> getExecutorType() const override { + tassert(6253504, "Can't get type string without pipeline", _pipeline); + return _pipeline->getTypeString(); + } + private: /** * Obtains the next document from the underlying Pipeline, and does change streams-related diff --git a/src/mongo/db/pipeline/search_helper.h b/src/mongo/db/pipeline/search_helper.h index fb38cd88690..25a224565a8 100644 --- a/src/mongo/db/pipeline/search_helper.h +++ b/src/mongo/db/pipeline/search_helper.h @@ -46,6 +46,22 @@ public: // invoked for inner collection in $lookup, for instance, only when expanded pipeline is passed // to the specific shard. virtual void injectSearchShardFiltererIfNeeded(Pipeline* pipeline){}; + + /** + * Check to see if in the current environment an additional pipeline needs to be run by the + * aggregation command to generate metadata results. Either returns the additional pipeline + * or nullptr if no pipeline is necessary. + * + * This can modify the passed in pipeline but does not take ownership of it. + */ + virtual std::unique_ptr<Pipeline, PipelineDeleter> generateMetadataPipelineForSearch( + OperationContext* opCtx, + boost::intrusive_ptr<ExpressionContext> expCtx, + const AggregateCommandRequest& request, + Pipeline* origPipeline, + boost::optional<UUID> uuid) { + return nullptr; + } }; /** diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index f2b7073f5a3..a7ba134a672 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -963,7 +963,15 @@ DispatchShardPipelineResults dispatchShardPipeline( "needsPrimaryShardMerge"_attr = needsPrimaryShardMerge); splitPipelines = splitPipeline(std::move(pipeline)); - exchangeSpec = checkIfEligibleForExchange(opCtx, splitPipelines->mergePipeline.get()); + // If the first stage of the pipeline is a $search stage, exchange optimization isn't + // possible. + // TODO SERVER-62537 Investigate relaxing this restriction. + if (!splitPipelines || !splitPipelines->shardsPipeline || + !splitPipelines->shardsPipeline->peekFront() || + splitPipelines->shardsPipeline->peekFront()->getSourceName() != + "$_internalSearchMongotRemote"_sd) { + exchangeSpec = checkIfEligibleForExchange(opCtx, splitPipelines->mergePipeline.get()); + } } // Generate the command object for the targeted shards. diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 7977f23f239..797009e9df7 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -183,6 +183,7 @@ env.Library( env.Library( target='cursor_response_idl', source=[ + 'cursor_idl_validator.cpp', 'cursor_response.idl', ], LIBDEPS_PRIVATE=[ diff --git a/src/mongo/db/query/cursor_idl_validator.cpp b/src/mongo/db/query/cursor_idl_validator.cpp new file mode 100644 index 00000000000..c66964a7e7e --- /dev/null +++ b/src/mongo/db/query/cursor_idl_validator.cpp @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/query/cursor_idl_validator.h" + +namespace mongo { + +/** + * Function used by the IDL parser to validate that a response has exactly one cursor type field. + */ +void validateIDLParsedCursorResponse(const CursorInitialReply* idlParsedObj) { + bool hasCursor = idlParsedObj->getCursor() != boost::none; + bool hasCursors = idlParsedObj->getCursors() != boost::none; + uassert(6253507, + "MultiResponseInitialCursor must have exactly one of 'cursor' or 'cursors' fields", + hasCursor != hasCursors); +} +} // namespace mongo diff --git a/src/mongo/db/query/cursor_idl_validator.h b/src/mongo/db/query/cursor_idl_validator.h new file mode 100644 index 00000000000..31617d7a428 --- /dev/null +++ b/src/mongo/db/query/cursor_idl_validator.h @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once +#include "mongo/db/query/cursor_response_gen.h" + +namespace mongo { + +class CursorInitialReply; +/** + * Function used by the IDL parser to validate that a response has exactly one cursor type field. + */ +void validateIDLParsedCursorResponse(const CursorInitialReply* idlParsedObj); +} // namespace mongo diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index b4042f559b1..81bbe7ed1b9 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -45,6 +45,7 @@ const char kCursorField[] = "cursor"; const char kIdField[] = "id"; const char kNsField[] = "ns"; const char kVarsField[] = "vars"; +const char kTypeField[] = "type"; const char kAtClusterTimeField[] = "atClusterTime"; const char kBatchField[] = "nextBatch"; const char kBatchFieldInitial[] = "firstBatch"; @@ -104,11 +105,15 @@ void CursorResponseBuilder::abandon() { void appendCursorResponseObject(long long cursorId, StringData cursorNamespace, BSONArray firstBatch, + boost::optional<StringData> cursorType, BSONObjBuilder* builder) { BSONObjBuilder cursorObj(builder->subobjStart(kCursorField)); cursorObj.append(kIdField, cursorId); cursorObj.append(kNsField, cursorNamespace); cursorObj.append(kBatchFieldInitial, firstBatch); + if (cursorType) { + cursorObj.append(kTypeField, cursorType.get()); + } cursorObj.done(); } @@ -130,6 +135,7 @@ CursorResponse::CursorResponse(NamespaceString nss, boost::optional<BSONObj> postBatchResumeToken, boost::optional<BSONObj> writeConcernError, boost::optional<BSONObj> varsField, + boost::optional<std::string> cursorType, bool partialResultsReturned, bool invalidated) : _nss(std::move(nss)), @@ -139,6 +145,7 @@ CursorResponse::CursorResponse(NamespaceString nss, _postBatchResumeToken(std::move(postBatchResumeToken)), _writeConcernError(std::move(writeConcernError)), _varsField(std::move(varsField)), + _cursorType(std::move(cursorType)), _partialResultsReturned(partialResultsReturned), _invalidated(invalidated) {} @@ -209,6 +216,13 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo << "' must be of type object in: " << cmdResponse}; } + BSONElement typeElt = cursorObj[kTypeField]; + if (!typeElt.eoo() && typeElt.type() != BSONType::String) { + return {ErrorCodes::TypeMismatch, + str::stream() << "Field '" << kTypeField << "' must be of type string but got " + << typeElt.type() << " in: " << cmdResponse}; + } + BSONElement batchElt = cursorObj[kBatchField]; if (batchElt.eoo()) { batchElt = cursorObj[kBatchFieldInitial]; @@ -297,6 +311,7 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo : boost::optional<BSONObj>{}, writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}, varsElt ? varsElt.Obj().getOwned() : boost::optional<BSONObj>{}, + typeElt ? boost::make_optional<std::string>(typeElt.String()) : boost::none, partialResultsReturned.trueValue(), invalidatedElem.trueValue()}}; } diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index ca81974e78b..a8151d7709a 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -142,13 +142,17 @@ private: * and appends the response object to the provided builder under the field name "cursor". * * The response object has the following format: - * { id: <NumberLong>, ns: <String>, firstBatch: <Array> }. + * { id: <NumberLong>, ns: <String>, firstBatch: <Array> , type: <String>}. + * + * The type field is optional, but can be used to differentiate cursors if multiple are returned + * at once. * * This function is deprecated. Prefer CursorResponseBuilder or CursorResponse::toBSON() instead. */ void appendCursorResponseObject(long long cursorId, StringData cursorNamespace, BSONArray firstBatch, + boost::optional<StringData> cursorType, BSONObjBuilder* builder); /** @@ -207,6 +211,7 @@ public: boost::optional<BSONObj> postBatchResumeToken = boost::none, boost::optional<BSONObj> writeConcernError = boost::none, boost::optional<BSONObj> varsField = boost::none, + boost::optional<std::string> cursorType = boost::none, bool partialResultsReturned = false, bool invalidated = false); @@ -249,6 +254,10 @@ public: return _varsField; } + auto getCursorType() const { + return _cursorType; + } + bool getPartialResultsReturned() const { return _partialResultsReturned; } @@ -274,6 +283,7 @@ private: boost::optional<BSONObj> _postBatchResumeToken; boost::optional<BSONObj> _writeConcernError; boost::optional<BSONObj> _varsField; + boost::optional<std::string> _cursorType; bool _partialResultsReturned = false; bool _invalidated = false; }; diff --git a/src/mongo/db/query/cursor_response.idl b/src/mongo/db/query/cursor_response.idl index 16f79c4be4d..8233f906c75 100644 --- a/src/mongo/db/query/cursor_response.idl +++ b/src/mongo/db/query/cursor_response.idl @@ -33,10 +33,19 @@ global: cpp_includes: - "mongo/db/namespace_string.h" - "mongo/idl/basic_types.h" + - "mongo/db/query/cursor_idl_validator.h" imports: - "mongo/idl/basic_types.idl" +enums: + CursorType: + description: "The type of a single cursor if a response has multiple cursors" + type: string + values: + SearchMetaResult: "meta" + DocumentResult: "results" + structs: ResponseCursorBase: description: "Common fields of initial and subsequent cursor responses." @@ -80,18 +89,59 @@ structs: type: array<object> unstable: false + MultiResponseInitialResponseCursor: + description: "A struct representing an initial response cursor if multiple cursors are returned." + inline_chained_structs: true + chained_structs: + ResponseCursorBase: ResponseCursorBase + fields: + firstBatch: + description: "The first batch of the cursor." + type: array<object> + unstable: false + type: + description: "Optional disambiguation string of a cursor." + type: CursorType + cpp_name: cursorType + unstable: true + optional: true + + MultiResponseCursor: + description: "A struct representing a cursor object inside an array of cursors" + fields: + cursor: + description: "The actual cursor object." + type: MultiResponseInitialResponseCursor + unstable: true + ok: + type: bool + unstable: true + CursorInitialReply: description: "A struct representing a initial cursor reply." + cpp_validator_func: "validateIDLParsedCursorResponse" fields: cursor: description: "A response cursor object." type: InitialResponseCursor unstable: false + optional: true + cursors: + description: "An array of cursor objects." + type: array<MultiResponseCursor> + unstable: true + optional: true vars: description: "An optional field containing additional response information for the query." type: object optional: true - unstable: false + unstable: true + type: + description: "An optional field containing disambiguation information if a reply contains multiple cursors." + type: CursorType + cpp_name: cursorType + optional: true + unstable: true GetMoreResponseCursor: description: "A struct representing a subsequent response cursor." diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index 6cedc76ccb0..183c27e05d6 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -348,6 +348,7 @@ TEST(CursorResponseTest, toBSONPartialResultsReturned) { boost::none, boost::none, boost::none, + boost::none, true); BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse); BSONObj expectedResponse = BSON( diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 9748e3de303..80857984d3e 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -359,6 +359,13 @@ public: */ virtual void enableSaveRecoveryUnitAcrossCommandsIfSupported() = 0; virtual bool isSaveRecoveryUnitAcrossCommandsEnabled() const = 0; + + /** + * For queries that have multiple executors, this can be used to differentiate between them. + */ + virtual boost::optional<StringData> getExecutorType() const { + return boost::none; + } }; } // namespace mongo diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp index 8c58484990a..20e618eff17 100644 --- a/src/mongo/db/query/query_request_test.cpp +++ b/src/mongo/db/query/query_request_test.cpp @@ -1551,7 +1551,7 @@ TEST(QueryRequestTest, ConvertToFindWithAllowDiskUseFalseSucceeds) { TEST(QueryRequestHelperTest, ValidateResponseMissingFields) { BSONObjBuilder builder; ASSERT_THROWS_CODE( - query_request_helper::validateCursorResponse(builder.asTempObj()), DBException, 40414); + query_request_helper::validateCursorResponse(builder.asTempObj()), DBException, 6253507); } TEST(QueryRequestHelperTest, ValidateResponseWrongDataType) { diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp index a52c3ddf9b3..9a128705054 100644 --- a/src/mongo/executor/network_test_env.cpp +++ b/src/mongo/executor/network_test_env.cpp @@ -113,7 +113,7 @@ void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) { const NamespaceString nss = NamespaceString(request.dbname, request.cmdObj.firstElement().String()); BSONObjBuilder result; - appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &result); + appendCursorResponseObject(0LL, nss.toString(), arr.arr(), boost::none, &result); return result.obj(); }); @@ -139,7 +139,7 @@ void NetworkTestEnv::onFindWithMetadataCommand(OnFindCommandWithMetadataFunction const NamespaceString nss = NamespaceString(request.dbname, request.cmdObj.firstElement().String()); BSONObjBuilder resultBuilder(std::move(metadata)); - appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder); + appendCursorResponseObject(0LL, nss.toString(), arr.arr(), boost::none, &resultBuilder); return RemoteCommandResponse(resultBuilder.obj(), Milliseconds(1)); }); diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index ad8cab93436..2680fa3d916 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -410,7 +410,8 @@ public: * * NOTE: Do not call from a callback running in the executor. * - * Prefer the version that takes an OperationContext* to this version. + * Prefer passing an OperationContext* or other interruptible as the second argument to leaving + * as not interruptible. */ virtual void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) = 0; diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp index 9bc01121b38..46d1bd846b4 100644 --- a/src/mongo/executor/task_executor_cursor.cpp +++ b/src/mongo/executor/task_executor_cursor.cpp @@ -86,6 +86,9 @@ TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other) if (other._cursorVars) { _cursorVars = other._cursorVars->getOwned(); } + if (other._cursorType) { + _cursorType = other._cursorType; + } // Other is no longer responsible for this cursor id. other._cursorId = 0; // Other should not cancel the callback on destruction. @@ -127,6 +130,19 @@ boost::optional<BSONObj> TaskExecutorCursor::getNext(OperationContext* opCtx) { return std::move(*_batchIter++); } +void TaskExecutorCursor::populateCursor(OperationContext* opCtx) { + tassert(6253502, + "populateCursors should only be called before cursor is initialized", + _cursorId == kUnitializedCursorId); + tassert(6253503, + "populateCursors should only be called after a remote command has been run", + _cbHandle); + // We really only care about populating the cursor "first batch" fields, but at some point we'll + // have to do all of the work done by this function anyway. This would have been called by + // getNext() the first time it was called. + _getNextBatch(opCtx); +} + const RemoteCommandRequest& TaskExecutorCursor::_createRequest(OperationContext* opCtx, const BSONObj& cmd) { // we pull this every time for updated client metadata @@ -171,8 +187,9 @@ void TaskExecutorCursor::_processResponse(OperationContext* opCtx, CursorRespons if (_cursorId == kUnitializedCursorId) { _ns = response.getNSS(); _rcr.dbname = _ns.db().toString(); - // 'vars' are only included in the first batch. + // 'vars' and type are only included in the first batch. _cursorVars = response.getVarsField(); + _cursorType = response.getCursorType(); } _cursorId = response.getCursorId(); diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h index bfc916d0e13..f7f545de2a5 100644 --- a/src/mongo/executor/task_executor_cursor.h +++ b/src/mongo/executor/task_executor_cursor.h @@ -114,6 +114,16 @@ public: */ boost::optional<BSONObj> getNext(OperationContext* opCtx); + /** + * Read the response from the remote command issued by this cursor and parse it into this + * object. Performs the same work as getNext() above does on the first call to it, and so this + * can throw any error that getNext can throw. + * + * Should not be called once getNext() has been called or the cursor has been otherwise + * initialized. + */ + void populateCursor(OperationContext* opCtx); + const CursorId getCursorId() const { return _cursorId; } @@ -128,6 +138,10 @@ public: return _cursorVars; } + auto getType() { + return _cursorType; + } + long long getBatchNum() { return _batchNum; } @@ -144,6 +158,14 @@ public: return _additionalCursors.size(); } + /** + * Return the callback that this cursor is waiting on. Can be used to block on getting a + * response to this request. Can be boost::none. + */ + auto getCallbackHandle() { + return _cbHandle; + } + private: /** * Runs a remote command and pipes the output back to this object @@ -186,6 +208,9 @@ private: // Variables sent alongside the results in the cursor. boost::optional<BSONObj> _cursorVars = boost::none; + // For commands that return multiple cursors, the type of the cursor. + boost::optional<std::string> _cursorType; + // This is a sum of the time spent waiting on remote calls. Milliseconds _millisecondsWaiting = Milliseconds(0); diff --git a/src/mongo/idl/idl_test.cpp b/src/mongo/idl/idl_test.cpp index 37133cb7566..febbe3dbc3f 100644 --- a/src/mongo/idl/idl_test.cpp +++ b/src/mongo/idl/idl_test.cpp @@ -48,6 +48,11 @@ using namespace mongo::idl::import; namespace mongo { +void mongo::idl::test::checkValuesEqual(StructWithValidator* structToValidate) { + uassert( + 6253512, "Values not equal", structToValidate->getFirst() == structToValidate->getSecond()); +} + namespace { bool isEquals(ConstDataRange left, const std::vector<uint8_t>& right) { @@ -1049,6 +1054,17 @@ TEST(IDLStructTests, WriteConcernTest) { } } +TEST(IDLStructTests, TestValidator) { + // Parser should assert that the values are equal. + IDLParserErrorContext ctxt("root"); + auto objToParse = BSON("first" << 1 << "second" << 2); + + ASSERT_THROWS_CODE(StructWithValidator::parse(ctxt, objToParse), AssertionException, 6253512); + + objToParse = BSON("first" << 1 << "second" << 1); + StructWithValidator::parse(ctxt, objToParse); +} + /// Struct default comparison tests TEST(IDLCompareTests, TestAllFields) { IDLParserErrorContext ctxt("root"); diff --git a/src/mongo/idl/idl_test.h b/src/mongo/idl/idl_test.h index 40b840a1f39..de4f26f7b42 100644 --- a/src/mongo/idl/idl_test.h +++ b/src/mongo/idl/idl_test.h @@ -37,6 +37,8 @@ namespace mongo { namespace idl { namespace test { +class StructWithValidator; + /** * Validates the given number is even */ @@ -98,6 +100,12 @@ inline Status validateOneInt(const std::vector<mongo::idl::import::One_int>& val return Status::OK(); } +/** + * Check that the two values in the struct are equal, assert otherwise. + */ +void checkValuesEqual(StructWithValidator* structToValidate); + + } // namespace test } // namespace idl } // namespace mongo diff --git a/src/mongo/idl/unittest.idl b/src/mongo/idl/unittest.idl index 5f7f816242c..022b28d739b 100644 --- a/src/mongo/idl/unittest.idl +++ b/src/mongo/idl/unittest.idl @@ -625,6 +625,22 @@ structs: ################################################################################################## # +# Test struct with cpp validator +# +################################################################################################## + + StructWithValidator: + description: mock + cpp_validator_func: checkValuesEqual + fields: + first: + type: int + optional: false + second: + type: int + optional: false +################################################################################################## +# # Test struct with variant # ################################################################################################## diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 22a9238fbd2..fe2a6536865 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -619,7 +619,7 @@ bool appendEmptyResultSet(OperationContext* opCtx, if (status == ErrorCodes::NamespaceNotFound) { // New (command) style reply - appendCursorResponseObject(0LL, ns, BSONArray(), &result); + appendCursorResponseObject(0LL, ns, BSONArray(), boost::none, &result); return true; } diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index c894d4afec4..bd186639684 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -875,6 +875,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, postBatchResumeToken, boost::none, boost::none, + boost::none, partialResultsReturned); } diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index 57991fccb36..82ec1df2809 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -275,10 +275,17 @@ void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& resp if (_allowPartialResults && isEligibleException) { // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true. - _remoteCursors.push_back( - {response.shardId.toString(), - {}, - {_nss, CursorId{0}, {}, boost::none, boost::none, boost::none, boost::none, true}}); + _remoteCursors.push_back({response.shardId.toString(), + {}, + {_nss, + CursorId{0}, + {}, + boost::none, + boost::none, + boost::none, + boost::none, + boost::none, + true}}); return; } |