summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2022-03-02 22:20:32 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-02 23:11:24 +0000
commitf25675cc8ea9d89672ce063f49dbdaa39e63ce1b (patch)
treeb1867e1b1e3db4dc5083da673bd62aa4cc218767
parent27ce39ba637159ae0be6e7734b1d7f114af7141c (diff)
downloadmongo-f25675cc8ea9d89672ce063f49dbdaa39e63ce1b.tar.gz
SERVER-62535 Allow sharded aggregation to return two cursors
-rw-r--r--buildscripts/idl/idl/ast.py1
-rw-r--r--buildscripts/idl/idl/binder.py1
-rw-r--r--buildscripts/idl/idl/generator.py3
-rw-r--r--buildscripts/idl/idl/parser.py1
-rw-r--r--buildscripts/idl/idl/syntax.py1
-rw-r--r--buildscripts/idl/idl_check_compatibility.py12
-rw-r--r--buildscripts/idl/tests/test_parser.py2
-rw-r--r--src/mongo/db/commands/find_cmd.cpp3
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp38
-rw-r--r--src/mongo/db/exec/sbe_cmd.cpp4
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/pipeline.h27
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.h5
-rw-r--r--src/mongo/db/pipeline/search_helper.h16
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp10
-rw-r--r--src/mongo/db/query/SConscript1
-rw-r--r--src/mongo/db/query/cursor_idl_validator.cpp44
-rw-r--r--src/mongo/db/query/cursor_idl_validator.h40
-rw-r--r--src/mongo/db/query/cursor_response.cpp15
-rw-r--r--src/mongo/db/query/cursor_response.h12
-rw-r--r--src/mongo/db/query/cursor_response.idl52
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp1
-rw-r--r--src/mongo/db/query/plan_executor.h7
-rw-r--r--src/mongo/db/query/query_request_test.cpp2
-rw-r--r--src/mongo/executor/network_test_env.cpp4
-rw-r--r--src/mongo/executor/task_executor.h3
-rw-r--r--src/mongo/executor/task_executor_cursor.cpp19
-rw-r--r--src/mongo/executor/task_executor_cursor.h25
-rw-r--r--src/mongo/idl/idl_test.cpp16
-rw-r--r--src/mongo/idl/idl_test.h8
-rw-r--r--src/mongo/idl/unittest.idl16
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp2
-rw-r--r--src/mongo/s/query/cluster_find.cpp1
-rw-r--r--src/mongo/s/query/establish_cursors.cpp15
34 files changed, 387 insertions, 21 deletions
diff --git a/buildscripts/idl/idl/ast.py b/buildscripts/idl/idl/ast.py
index 1098cb606d6..5e6958a17c6 100644
--- a/buildscripts/idl/idl/ast.py
+++ b/buildscripts/idl/idl/ast.py
@@ -143,6 +143,7 @@ class Struct(common.SourceLocation):
self.fields = [] # type: List[Field]
self.allow_global_collection_name = False # type: bool
self.non_const_getter = False # type: bool
+ self.cpp_validator_func = None # type: str
super(Struct, self).__init__(file_name, line, column)
diff --git a/buildscripts/idl/idl/binder.py b/buildscripts/idl/idl/binder.py
index 0ef5eb966e8..14a4c23a3bb 100644
--- a/buildscripts/idl/idl/binder.py
+++ b/buildscripts/idl/idl/binder.py
@@ -266,6 +266,7 @@ def _bind_struct_common(ctxt, parsed_spec, struct, ast_struct):
ast_struct.immutable = struct.immutable
ast_struct.inline_chained_structs = struct.inline_chained_structs
ast_struct.generate_comparison_operators = struct.generate_comparison_operators
+ ast_struct.cpp_validator_func = struct.cpp_validator_func
ast_struct.cpp_name = struct.cpp_name or struct.name
ast_struct.qualified_cpp_name = _get_struct_qualified_cpp_name(struct)
ast_struct.allow_global_collection_name = struct.allow_global_collection_name
diff --git a/buildscripts/idl/idl/generator.py b/buildscripts/idl/idl/generator.py
index ff9da001069..08a3caecaf8 100644
--- a/buildscripts/idl/idl/generator.py
+++ b/buildscripts/idl/idl/generator.py
@@ -1784,6 +1784,9 @@ class _CppSourceFileWriter(_CppFileWriterBase):
field_usage_check.add_final_checks()
self._writer.write_empty_line()
+ if struct.cpp_validator_func is not None:
+ self._writer.write_line(struct.cpp_validator_func + "(this);")
+
self._gen_command_deserializer(struct, "bsonObject")
def gen_op_msg_request_deserializer_methods(self, struct):
diff --git a/buildscripts/idl/idl/parser.py b/buildscripts/idl/idl/parser.py
index d624b4f4473..f7a133783fd 100644
--- a/buildscripts/idl/idl/parser.py
+++ b/buildscripts/idl/idl/parser.py
@@ -526,6 +526,7 @@ def _parse_struct(ctxt, spec, name, node):
"immutable": _RuleDesc('bool_scalar'),
"generate_comparison_operators": _RuleDesc("bool_scalar"),
"non_const_getter": _RuleDesc('bool_scalar'),
+ "cpp_validator_func": _RuleDesc('scalar'),
})
# PyLint has difficulty with some iterables: https://github.com/PyCQA/pylint/issues/3105
diff --git a/buildscripts/idl/idl/syntax.py b/buildscripts/idl/idl/syntax.py
index 69be6288cbe..89af355cca4 100644
--- a/buildscripts/idl/idl/syntax.py
+++ b/buildscripts/idl/idl/syntax.py
@@ -529,6 +529,7 @@ class Struct(common.SourceLocation):
self.fields = None # type: List[Field]
self.allow_global_collection_name = False # type: bool
self.non_const_getter = False # type: bool
+ self.cpp_validator_func = None # type: str
# Command only property
self.cpp_name = None # type: str
diff --git a/buildscripts/idl/idl_check_compatibility.py b/buildscripts/idl/idl_check_compatibility.py
index 256339ca07c..0cb6a7e25f4 100644
--- a/buildscripts/idl/idl_check_compatibility.py
+++ b/buildscripts/idl/idl_check_compatibility.py
@@ -166,6 +166,14 @@ IGNORE_UNSTABLE_LIST: List[str] = [
# visible. This is part of the listIndexes output when executed against system.bucket.*
# collections, which users should avoid doing.
'listIndexes-reply-originalSpec',
+ # The 'vars' field was introduced to facilitate communication between mongot and mongod and is
+ # not user visible.
+ 'find-reply-vars',
+ 'aggregate-reply-vars',
+ # The 'cursor' field is now optional in a reply, as inter-node communication in aggregation
+ # can return one or more cursors. Multiple cursors are covered under the 'cursors' field.
+ 'find-reply-cursor',
+ 'aggregate-reply-cursor',
]
SKIPPED_FILES = ["unittest.idl"]
@@ -499,8 +507,8 @@ def check_reply_field(ctxt: IDLCompatibilityContext, old_field: syntax.Field,
and old_field_type.name == "optionalBool")
new_field_optional = new_field.optional or (new_field_type
and new_field_type.name == "optionalBool")
- if not old_field.unstable:
- field_name: str = cmd_name + "-reply-" + new_field.name
+ field_name: str = cmd_name + "-reply-" + new_field.name
+ if not old_field.unstable and field_name not in IGNORE_UNSTABLE_LIST:
if new_field.unstable and field_name not in IGNORE_UNSTABLE_LIST:
ctxt.add_new_reply_field_unstable_error(cmd_name, new_field.name, new_idl_file_path)
if new_field_optional and not old_field_optional:
diff --git a/buildscripts/idl/tests/test_parser.py b/buildscripts/idl/tests/test_parser.py
index 1439d58c383..8fc760caf13 100644
--- a/buildscripts/idl/tests/test_parser.py
+++ b/buildscripts/idl/tests/test_parser.py
@@ -286,6 +286,7 @@ class TestParser(testcase.IDLTestcase):
immutable: true
inline_chained_structs: true
generate_comparison_operators: true
+ cpp_validator_func: funcName
fields:
foo: bar
"""))
@@ -300,6 +301,7 @@ class TestParser(testcase.IDLTestcase):
immutable: false
inline_chained_structs: false
generate_comparison_operators: false
+ cpp_validator_func: funcName
fields:
foo: bar
"""))
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 732233dd194..2de5d93b52f 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -591,7 +591,8 @@ public:
const CursorId cursorId = 0;
endQueryOp(opCtx, collection, *exec, numResults, cursorId);
auto bodyBuilder = result->getBodyBuilder();
- appendCursorResponseObject(cursorId, nss.ns(), BSONArray(), &bodyBuilder);
+ appendCursorResponseObject(
+ cursorId, nss.ns(), BSONArray(), boost::none, &bodyBuilder);
return;
}
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 813ad0e0a3e..64907d4d7d3 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -141,8 +141,11 @@ bool handleCursorCommand(OperationContext* opCtx,
invariant(cursors[idx]);
BSONObjBuilder cursorResult;
- appendCursorResponseObject(
- cursors[idx]->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult);
+ appendCursorResponseObject(cursors[idx]->cursorid(),
+ nsForCursor.ns(),
+ BSONArray(),
+ cursors[idx]->getExecutor()->getExecutorType(),
+ &cursorResult);
cursorResult.appendBool("ok", 1);
cursorsBuilder.append(cursorResult.obj());
@@ -514,6 +517,35 @@ std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> createExchangePipelinesI
}
/**
+ * Creates additional pipelines if needed to serve the aggregation. This includes additional
+ * pipelines for exchange optimization and search commands that generate metadata. Returns
+ * a vector of all pipelines needed for the query, including the original one.
+ *
+ * Takes ownership of the original, passed in, pipeline.
+ */
+std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> createAdditionalPipelinesIfNeeded(
+ OperationContext* opCtx,
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ const AggregateCommandRequest& request,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ boost::optional<UUID> collUUID) {
+
+ std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> pipelines;
+ // Exchange is not allowed to be specified if there is a $search stage.
+ if (auto metadataPipe = getSearchHelpers(opCtx->getServiceContext())
+ ->generateMetadataPipelineForSearch(
+ opCtx, expCtx, request, pipeline.get(), collUUID)) {
+ pipelines.push_back(std::move(pipeline));
+ pipelines.push_back(std::move(metadataPipe));
+ } else {
+ // Takes ownership of 'pipeline'.
+ pipelines =
+ createExchangePipelinesIfNeeded(opCtx, expCtx, request, std::move(pipeline), collUUID);
+ }
+ return pipelines;
+}
+
+/**
* Performs validations related to API versioning and time-series stages.
* Throws UserAssertion if any of the validations fails
* - validation of API versioning on each stage on the pipeline
@@ -574,7 +606,7 @@ std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createLegacyEx
std::move(attachExecutorCallback.second),
pipeline.get());
- auto pipelines = createExchangePipelinesIfNeeded(
+ auto pipelines = createAdditionalPipelinesIfNeeded(
expCtx->opCtx, expCtx, request, std::move(pipeline), expCtx->uuid);
for (auto&& pipelineIt : pipelines) {
// There are separate ExpressionContexts for each exchange pipeline, so make sure to
diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp
index bba9d728375..de2d5131932 100644
--- a/src/mongo/db/exec/sbe_cmd.cpp
+++ b/src/mongo/db/exec/sbe_cmd.cpp
@@ -148,7 +148,7 @@ public:
}
if (exec->isEOF()) {
- appendCursorResponseObject(0LL, nss.ns(), firstBatch.arr(), &result);
+ appendCursorResponseObject(0LL, nss.ns(), firstBatch.arr(), boost::none, &result);
return true;
}
@@ -167,7 +167,7 @@ public:
{}});
appendCursorResponseObject(
- pinnedCursor.getCursor()->cursorid(), nss.ns(), firstBatch.arr(), &result);
+ pinnedCursor.getCursor()->cursorid(), nss.ns(), firstBatch.arr(), boost::none, &result);
return true;
}
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index aa47aef4bb0..c87ca855fd8 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -354,6 +354,7 @@ pipelineEnv.Library(
'$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source',
'$BUILD_DIR/mongo/db/query/collation/collator_factory_interface',
'$BUILD_DIR/mongo/db/query/collation/collator_interface',
+ '$BUILD_DIR/mongo/db/query/cursor_response_idl',
'$BUILD_DIR/mongo/db/query/datetime/date_time_support',
'$BUILD_DIR/mongo/db/query/query_knobs',
'$BUILD_DIR/mongo/db/query/sort_pattern',
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index e1ef7b6a0a4..6cdf67181b9 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -41,6 +41,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h"
+#include "mongo/db/query/cursor_response_gen.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/executor/task_executor.h"
@@ -382,6 +383,21 @@ public:
*/
friend class PipelineD;
+ /**
+ * For commands that return multiple pipelines, this value will contain the type of pipeline.
+ * This can be populated to the cursor so consumers do not have to depend on order or guess
+ * which pipeline is which. Default to a regular result pipeline.
+ */
+ CursorTypeEnum pipelineType = CursorTypeEnum::DocumentResult;
+
+ /**
+ * Get a string representation of the pipeline type.
+ */
+ auto getTypeString() {
+ return CursorType_serializer(pipelineType);
+ }
+
+
private:
friend class PipelineDeleter;
@@ -450,4 +466,15 @@ private:
bool _dismissed = false;
};
+/**
+ * A 'ServiceContext' decorator that by default does nothing but can be set to generate a
+ * complimentary, metadata pipeline to the one passed in.
+ */
+extern ServiceContext::Decoration<std::unique_ptr<Pipeline, PipelineDeleter> (*)(
+ OperationContext* opCtx,
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ const AggregateCommandRequest& request,
+ Pipeline* origPipeline,
+ boost::optional<UUID> uuid)>
+ generateMetadataPipelineFunc;
} // namespace mongo
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h
index efef0a7bb2d..d320a63062a 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.h
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.h
@@ -163,6 +163,11 @@ public:
return false;
}
+ boost::optional<StringData> getExecutorType() const override {
+ tassert(6253504, "Can't get type string without pipeline", _pipeline);
+ return _pipeline->getTypeString();
+ }
+
private:
/**
* Obtains the next document from the underlying Pipeline, and does change streams-related
diff --git a/src/mongo/db/pipeline/search_helper.h b/src/mongo/db/pipeline/search_helper.h
index fb38cd88690..25a224565a8 100644
--- a/src/mongo/db/pipeline/search_helper.h
+++ b/src/mongo/db/pipeline/search_helper.h
@@ -46,6 +46,22 @@ public:
// invoked for inner collection in $lookup, for instance, only when expanded pipeline is passed
// to the specific shard.
virtual void injectSearchShardFiltererIfNeeded(Pipeline* pipeline){};
+
+ /**
+ * Check to see if in the current environment an additional pipeline needs to be run by the
+ * aggregation command to generate metadata results. Either returns the additional pipeline
+ * or nullptr if no pipeline is necessary.
+ *
+ * This can modify the passed in pipeline but does not take ownership of it.
+ */
+ virtual std::unique_ptr<Pipeline, PipelineDeleter> generateMetadataPipelineForSearch(
+ OperationContext* opCtx,
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ const AggregateCommandRequest& request,
+ Pipeline* origPipeline,
+ boost::optional<UUID> uuid) {
+ return nullptr;
+ }
};
/**
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index f2b7073f5a3..a7ba134a672 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -963,7 +963,15 @@ DispatchShardPipelineResults dispatchShardPipeline(
"needsPrimaryShardMerge"_attr = needsPrimaryShardMerge);
splitPipelines = splitPipeline(std::move(pipeline));
- exchangeSpec = checkIfEligibleForExchange(opCtx, splitPipelines->mergePipeline.get());
+ // If the first stage of the pipeline is a $search stage, exchange optimization isn't
+ // possible.
+ // TODO SERVER-62537 Investigate relaxing this restriction.
+ if (!splitPipelines || !splitPipelines->shardsPipeline ||
+ !splitPipelines->shardsPipeline->peekFront() ||
+ splitPipelines->shardsPipeline->peekFront()->getSourceName() !=
+ "$_internalSearchMongotRemote"_sd) {
+ exchangeSpec = checkIfEligibleForExchange(opCtx, splitPipelines->mergePipeline.get());
+ }
}
// Generate the command object for the targeted shards.
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 7977f23f239..797009e9df7 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -183,6 +183,7 @@ env.Library(
env.Library(
target='cursor_response_idl',
source=[
+ 'cursor_idl_validator.cpp',
'cursor_response.idl',
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/db/query/cursor_idl_validator.cpp b/src/mongo/db/query/cursor_idl_validator.cpp
new file mode 100644
index 00000000000..c66964a7e7e
--- /dev/null
+++ b/src/mongo/db/query/cursor_idl_validator.cpp
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/query/cursor_idl_validator.h"
+
+namespace mongo {
+
+/**
+ * Function used by the IDL parser to validate that a response has exactly one cursor type field.
+ */
+void validateIDLParsedCursorResponse(const CursorInitialReply* idlParsedObj) {
+ bool hasCursor = idlParsedObj->getCursor() != boost::none;
+ bool hasCursors = idlParsedObj->getCursors() != boost::none;
+ uassert(6253507,
+ "MultiResponseInitialCursor must have exactly one of 'cursor' or 'cursors' fields",
+ hasCursor != hasCursors);
+}
+} // namespace mongo
diff --git a/src/mongo/db/query/cursor_idl_validator.h b/src/mongo/db/query/cursor_idl_validator.h
new file mode 100644
index 00000000000..31617d7a428
--- /dev/null
+++ b/src/mongo/db/query/cursor_idl_validator.h
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+#include "mongo/db/query/cursor_response_gen.h"
+
+namespace mongo {
+
+class CursorInitialReply;
+/**
+ * Function used by the IDL parser to validate that a response has exactly one cursor type field.
+ */
+void validateIDLParsedCursorResponse(const CursorInitialReply* idlParsedObj);
+} // namespace mongo
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index b4042f559b1..81bbe7ed1b9 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -45,6 +45,7 @@ const char kCursorField[] = "cursor";
const char kIdField[] = "id";
const char kNsField[] = "ns";
const char kVarsField[] = "vars";
+const char kTypeField[] = "type";
const char kAtClusterTimeField[] = "atClusterTime";
const char kBatchField[] = "nextBatch";
const char kBatchFieldInitial[] = "firstBatch";
@@ -104,11 +105,15 @@ void CursorResponseBuilder::abandon() {
void appendCursorResponseObject(long long cursorId,
StringData cursorNamespace,
BSONArray firstBatch,
+ boost::optional<StringData> cursorType,
BSONObjBuilder* builder) {
BSONObjBuilder cursorObj(builder->subobjStart(kCursorField));
cursorObj.append(kIdField, cursorId);
cursorObj.append(kNsField, cursorNamespace);
cursorObj.append(kBatchFieldInitial, firstBatch);
+ if (cursorType) {
+ cursorObj.append(kTypeField, cursorType.get());
+ }
cursorObj.done();
}
@@ -130,6 +135,7 @@ CursorResponse::CursorResponse(NamespaceString nss,
boost::optional<BSONObj> postBatchResumeToken,
boost::optional<BSONObj> writeConcernError,
boost::optional<BSONObj> varsField,
+ boost::optional<std::string> cursorType,
bool partialResultsReturned,
bool invalidated)
: _nss(std::move(nss)),
@@ -139,6 +145,7 @@ CursorResponse::CursorResponse(NamespaceString nss,
_postBatchResumeToken(std::move(postBatchResumeToken)),
_writeConcernError(std::move(writeConcernError)),
_varsField(std::move(varsField)),
+ _cursorType(std::move(cursorType)),
_partialResultsReturned(partialResultsReturned),
_invalidated(invalidated) {}
@@ -209,6 +216,13 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
<< "' must be of type object in: " << cmdResponse};
}
+ BSONElement typeElt = cursorObj[kTypeField];
+ if (!typeElt.eoo() && typeElt.type() != BSONType::String) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << "Field '" << kTypeField << "' must be of type string but got "
+ << typeElt.type() << " in: " << cmdResponse};
+ }
+
BSONElement batchElt = cursorObj[kBatchField];
if (batchElt.eoo()) {
batchElt = cursorObj[kBatchFieldInitial];
@@ -297,6 +311,7 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
: boost::optional<BSONObj>{},
writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{},
varsElt ? varsElt.Obj().getOwned() : boost::optional<BSONObj>{},
+ typeElt ? boost::make_optional<std::string>(typeElt.String()) : boost::none,
partialResultsReturned.trueValue(),
invalidatedElem.trueValue()}};
}
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index ca81974e78b..a8151d7709a 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -142,13 +142,17 @@ private:
* and appends the response object to the provided builder under the field name "cursor".
*
* The response object has the following format:
- * { id: <NumberLong>, ns: <String>, firstBatch: <Array> }.
+ * { id: <NumberLong>, ns: <String>, firstBatch: <Array> , type: <String>}.
+ *
+ * The type field is optional, but can be used to differentiate cursors if multiple are returned
+ * at once.
*
* This function is deprecated. Prefer CursorResponseBuilder or CursorResponse::toBSON() instead.
*/
void appendCursorResponseObject(long long cursorId,
StringData cursorNamespace,
BSONArray firstBatch,
+ boost::optional<StringData> cursorType,
BSONObjBuilder* builder);
/**
@@ -207,6 +211,7 @@ public:
boost::optional<BSONObj> postBatchResumeToken = boost::none,
boost::optional<BSONObj> writeConcernError = boost::none,
boost::optional<BSONObj> varsField = boost::none,
+ boost::optional<std::string> cursorType = boost::none,
bool partialResultsReturned = false,
bool invalidated = false);
@@ -249,6 +254,10 @@ public:
return _varsField;
}
+ auto getCursorType() const {
+ return _cursorType;
+ }
+
bool getPartialResultsReturned() const {
return _partialResultsReturned;
}
@@ -274,6 +283,7 @@ private:
boost::optional<BSONObj> _postBatchResumeToken;
boost::optional<BSONObj> _writeConcernError;
boost::optional<BSONObj> _varsField;
+ boost::optional<std::string> _cursorType;
bool _partialResultsReturned = false;
bool _invalidated = false;
};
diff --git a/src/mongo/db/query/cursor_response.idl b/src/mongo/db/query/cursor_response.idl
index 16f79c4be4d..8233f906c75 100644
--- a/src/mongo/db/query/cursor_response.idl
+++ b/src/mongo/db/query/cursor_response.idl
@@ -33,10 +33,19 @@ global:
cpp_includes:
- "mongo/db/namespace_string.h"
- "mongo/idl/basic_types.h"
+ - "mongo/db/query/cursor_idl_validator.h"
imports:
- "mongo/idl/basic_types.idl"
+enums:
+ CursorType:
+ description: "The type of a single cursor if a response has multiple cursors"
+ type: string
+ values:
+ SearchMetaResult: "meta"
+ DocumentResult: "results"
+
structs:
ResponseCursorBase:
description: "Common fields of initial and subsequent cursor responses."
@@ -80,18 +89,59 @@ structs:
type: array<object>
unstable: false
+ MultiResponseInitialResponseCursor:
+ description: "A struct representing an initial response cursor if multiple cursors are returned."
+ inline_chained_structs: true
+ chained_structs:
+ ResponseCursorBase: ResponseCursorBase
+ fields:
+ firstBatch:
+ description: "The first batch of the cursor."
+ type: array<object>
+ unstable: false
+ type:
+ description: "Optional disambiguation string of a cursor."
+ type: CursorType
+ cpp_name: cursorType
+ unstable: true
+ optional: true
+
+ MultiResponseCursor:
+ description: "A struct representing a cursor object inside an array of cursors"
+ fields:
+ cursor:
+ description: "The actual cursor object."
+ type: MultiResponseInitialResponseCursor
+ unstable: true
+ ok:
+ type: bool
+ unstable: true
+
CursorInitialReply:
description: "A struct representing a initial cursor reply."
+ cpp_validator_func: "validateIDLParsedCursorResponse"
fields:
cursor:
description: "A response cursor object."
type: InitialResponseCursor
unstable: false
+ optional: true
+ cursors:
+ description: "An array of cursor objects."
+ type: array<MultiResponseCursor>
+ unstable: true
+ optional: true
vars:
description: "An optional field containing additional response information for the query."
type: object
optional: true
- unstable: false
+ unstable: true
+ type:
+ description: "An optional field containing disambiguation information if a reply contains multiple cursors."
+ type: CursorType
+ cpp_name: cursorType
+ optional: true
+ unstable: true
GetMoreResponseCursor:
description: "A struct representing a subsequent response cursor."
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index 6cedc76ccb0..183c27e05d6 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -348,6 +348,7 @@ TEST(CursorResponseTest, toBSONPartialResultsReturned) {
boost::none,
boost::none,
boost::none,
+ boost::none,
true);
BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse);
BSONObj expectedResponse = BSON(
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 9748e3de303..80857984d3e 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -359,6 +359,13 @@ public:
*/
virtual void enableSaveRecoveryUnitAcrossCommandsIfSupported() = 0;
virtual bool isSaveRecoveryUnitAcrossCommandsEnabled() const = 0;
+
+ /**
+ * For queries that have multiple executors, this can be used to differentiate between them.
+ */
+ virtual boost::optional<StringData> getExecutorType() const {
+ return boost::none;
+ }
};
} // namespace mongo
diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp
index 8c58484990a..20e618eff17 100644
--- a/src/mongo/db/query/query_request_test.cpp
+++ b/src/mongo/db/query/query_request_test.cpp
@@ -1551,7 +1551,7 @@ TEST(QueryRequestTest, ConvertToFindWithAllowDiskUseFalseSucceeds) {
TEST(QueryRequestHelperTest, ValidateResponseMissingFields) {
BSONObjBuilder builder;
ASSERT_THROWS_CODE(
- query_request_helper::validateCursorResponse(builder.asTempObj()), DBException, 40414);
+ query_request_helper::validateCursorResponse(builder.asTempObj()), DBException, 6253507);
}
TEST(QueryRequestHelperTest, ValidateResponseWrongDataType) {
diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp
index a52c3ddf9b3..9a128705054 100644
--- a/src/mongo/executor/network_test_env.cpp
+++ b/src/mongo/executor/network_test_env.cpp
@@ -113,7 +113,7 @@ void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) {
const NamespaceString nss =
NamespaceString(request.dbname, request.cmdObj.firstElement().String());
BSONObjBuilder result;
- appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &result);
+ appendCursorResponseObject(0LL, nss.toString(), arr.arr(), boost::none, &result);
return result.obj();
});
@@ -139,7 +139,7 @@ void NetworkTestEnv::onFindWithMetadataCommand(OnFindCommandWithMetadataFunction
const NamespaceString nss =
NamespaceString(request.dbname, request.cmdObj.firstElement().String());
BSONObjBuilder resultBuilder(std::move(metadata));
- appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder);
+ appendCursorResponseObject(0LL, nss.toString(), arr.arr(), boost::none, &resultBuilder);
return RemoteCommandResponse(resultBuilder.obj(), Milliseconds(1));
});
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index ad8cab93436..2680fa3d916 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -410,7 +410,8 @@ public:
*
* NOTE: Do not call from a callback running in the executor.
*
- * Prefer the version that takes an OperationContext* to this version.
+ * Prefer passing an OperationContext* or other interruptible as the second argument to leaving
+ * as not interruptible.
*/
virtual void wait(const CallbackHandle& cbHandle,
Interruptible* interruptible = Interruptible::notInterruptible()) = 0;
diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp
index 9bc01121b38..46d1bd846b4 100644
--- a/src/mongo/executor/task_executor_cursor.cpp
+++ b/src/mongo/executor/task_executor_cursor.cpp
@@ -86,6 +86,9 @@ TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other)
if (other._cursorVars) {
_cursorVars = other._cursorVars->getOwned();
}
+ if (other._cursorType) {
+ _cursorType = other._cursorType;
+ }
// Other is no longer responsible for this cursor id.
other._cursorId = 0;
// Other should not cancel the callback on destruction.
@@ -127,6 +130,19 @@ boost::optional<BSONObj> TaskExecutorCursor::getNext(OperationContext* opCtx) {
return std::move(*_batchIter++);
}
+void TaskExecutorCursor::populateCursor(OperationContext* opCtx) {
+ tassert(6253502,
+ "populateCursors should only be called before cursor is initialized",
+ _cursorId == kUnitializedCursorId);
+ tassert(6253503,
+ "populateCursors should only be called after a remote command has been run",
+ _cbHandle);
+ // We really only care about populating the cursor "first batch" fields, but at some point we'll
+ // have to do all of the work done by this function anyway. This would have been called by
+ // getNext() the first time it was called.
+ _getNextBatch(opCtx);
+}
+
const RemoteCommandRequest& TaskExecutorCursor::_createRequest(OperationContext* opCtx,
const BSONObj& cmd) {
// we pull this every time for updated client metadata
@@ -171,8 +187,9 @@ void TaskExecutorCursor::_processResponse(OperationContext* opCtx, CursorRespons
if (_cursorId == kUnitializedCursorId) {
_ns = response.getNSS();
_rcr.dbname = _ns.db().toString();
- // 'vars' are only included in the first batch.
+ // 'vars' and type are only included in the first batch.
_cursorVars = response.getVarsField();
+ _cursorType = response.getCursorType();
}
_cursorId = response.getCursorId();
diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h
index bfc916d0e13..f7f545de2a5 100644
--- a/src/mongo/executor/task_executor_cursor.h
+++ b/src/mongo/executor/task_executor_cursor.h
@@ -114,6 +114,16 @@ public:
*/
boost::optional<BSONObj> getNext(OperationContext* opCtx);
+ /**
+ * Read the response from the remote command issued by this cursor and parse it into this
+ * object. Performs the same work as getNext() above does on the first call to it, and so this
+ * can throw any error that getNext can throw.
+ *
+ * Should not be called once getNext() has been called or the cursor has been otherwise
+ * initialized.
+ */
+ void populateCursor(OperationContext* opCtx);
+
const CursorId getCursorId() const {
return _cursorId;
}
@@ -128,6 +138,10 @@ public:
return _cursorVars;
}
+ auto getType() {
+ return _cursorType;
+ }
+
long long getBatchNum() {
return _batchNum;
}
@@ -144,6 +158,14 @@ public:
return _additionalCursors.size();
}
+ /**
+ * Return the callback that this cursor is waiting on. Can be used to block on getting a
+ * response to this request. Can be boost::none.
+ */
+ auto getCallbackHandle() {
+ return _cbHandle;
+ }
+
private:
/**
* Runs a remote command and pipes the output back to this object
@@ -186,6 +208,9 @@ private:
// Variables sent alongside the results in the cursor.
boost::optional<BSONObj> _cursorVars = boost::none;
+ // For commands that return multiple cursors, the type of the cursor.
+ boost::optional<std::string> _cursorType;
+
// This is a sum of the time spent waiting on remote calls.
Milliseconds _millisecondsWaiting = Milliseconds(0);
diff --git a/src/mongo/idl/idl_test.cpp b/src/mongo/idl/idl_test.cpp
index 37133cb7566..febbe3dbc3f 100644
--- a/src/mongo/idl/idl_test.cpp
+++ b/src/mongo/idl/idl_test.cpp
@@ -48,6 +48,11 @@ using namespace mongo::idl::import;
namespace mongo {
+void mongo::idl::test::checkValuesEqual(StructWithValidator* structToValidate) {
+ uassert(
+ 6253512, "Values not equal", structToValidate->getFirst() == structToValidate->getSecond());
+}
+
namespace {
bool isEquals(ConstDataRange left, const std::vector<uint8_t>& right) {
@@ -1049,6 +1054,17 @@ TEST(IDLStructTests, WriteConcernTest) {
}
}
+TEST(IDLStructTests, TestValidator) {
+ // Parser should assert that the values are equal.
+ IDLParserErrorContext ctxt("root");
+ auto objToParse = BSON("first" << 1 << "second" << 2);
+
+ ASSERT_THROWS_CODE(StructWithValidator::parse(ctxt, objToParse), AssertionException, 6253512);
+
+ objToParse = BSON("first" << 1 << "second" << 1);
+ StructWithValidator::parse(ctxt, objToParse);
+}
+
/// Struct default comparison tests
TEST(IDLCompareTests, TestAllFields) {
IDLParserErrorContext ctxt("root");
diff --git a/src/mongo/idl/idl_test.h b/src/mongo/idl/idl_test.h
index 40b840a1f39..de4f26f7b42 100644
--- a/src/mongo/idl/idl_test.h
+++ b/src/mongo/idl/idl_test.h
@@ -37,6 +37,8 @@ namespace mongo {
namespace idl {
namespace test {
+class StructWithValidator;
+
/**
* Validates the given number is even
*/
@@ -98,6 +100,12 @@ inline Status validateOneInt(const std::vector<mongo::idl::import::One_int>& val
return Status::OK();
}
+/**
+ * Check that the two values in the struct are equal, assert otherwise.
+ */
+void checkValuesEqual(StructWithValidator* structToValidate);
+
+
} // namespace test
} // namespace idl
} // namespace mongo
diff --git a/src/mongo/idl/unittest.idl b/src/mongo/idl/unittest.idl
index 5f7f816242c..022b28d739b 100644
--- a/src/mongo/idl/unittest.idl
+++ b/src/mongo/idl/unittest.idl
@@ -625,6 +625,22 @@ structs:
##################################################################################################
#
+# Test struct with cpp validator
+#
+##################################################################################################
+
+ StructWithValidator:
+ description: mock
+ cpp_validator_func: checkValuesEqual
+ fields:
+ first:
+ type: int
+ optional: false
+ second:
+ type: int
+ optional: false
+##################################################################################################
+#
# Test struct with variant
#
##################################################################################################
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index 22a9238fbd2..fe2a6536865 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -619,7 +619,7 @@ bool appendEmptyResultSet(OperationContext* opCtx,
if (status == ErrorCodes::NamespaceNotFound) {
// New (command) style reply
- appendCursorResponseObject(0LL, ns, BSONArray(), &result);
+ appendCursorResponseObject(0LL, ns, BSONArray(), boost::none, &result);
return true;
}
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index c894d4afec4..bd186639684 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -875,6 +875,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
postBatchResumeToken,
boost::none,
boost::none,
+ boost::none,
partialResultsReturned);
}
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 57991fccb36..82ec1df2809 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -275,10 +275,17 @@ void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& resp
if (_allowPartialResults && isEligibleException) {
// This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an
// empty HostAndPort, and which has the 'partialResultsReturned' flag set to true.
- _remoteCursors.push_back(
- {response.shardId.toString(),
- {},
- {_nss, CursorId{0}, {}, boost::none, boost::none, boost::none, boost::none, true}});
+ _remoteCursors.push_back({response.shardId.toString(),
+ {},
+ {_nss,
+ CursorId{0},
+ {},
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ true}});
return;
}