summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2018-07-12 14:48:46 -0400
committerGregory Noma <gregory.noma@gmail.com>2018-07-24 13:56:03 -0400
commit0136f88d1c7d3e49f0e089f826e0b19af45f3b89 (patch)
tree77306a28d4b4c7946c1158dd127a916ab146cd0f /src
parent9d31d0caa167e9661aaf0f10f260313133bd2a02 (diff)
downloadmongo-0136f88d1c7d3e49f0e089f826e0b19af45f3b89.tar.gz
SERVER-36020 Redesign CursorResponseBuilder to allow usage of DocumentSequence
Co-authored-by: Anthony Roy <anthony.roy@10gen.com>
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands.cpp4
-rw-r--r--src/mongo/db/commands.h4
-rw-r--r--src/mongo/db/commands/count_cmd.cpp7
-rw-r--r--src/mongo/db/commands/current_op.cpp10
-rw-r--r--src/mongo/db/commands/distinct.cpp7
-rw-r--r--src/mongo/db/commands/explain_cmd.cpp5
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp8
-rw-r--r--src/mongo/db/commands/find_cmd.cpp12
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp4
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp7
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp14
-rw-r--r--src/mongo/db/commands/run_aggregate.h3
-rw-r--r--src/mongo/db/commands/user_management_commands.cpp10
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp10
-rw-r--r--src/mongo/db/commands_test.cpp4
-rw-r--r--src/mongo/db/query/SConscript2
-rw-r--r--src/mongo/db/query/cursor_response.cpp46
-rw-r--r--src/mongo/db/query/cursor_response.h39
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp28
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp9
-rw-r--r--src/mongo/rpc/op_msg.h4
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp14
-rw-r--r--src/mongo/s/commands/cluster_count_cmd.cpp8
-rw-r--r--src/mongo/s/commands/cluster_distinct_cmd.cpp8
-rw-r--r--src/mongo/s/commands/cluster_explain_cmd.cpp5
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp11
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.cpp16
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp5
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp5
-rw-r--r--src/mongo/s/query/cluster_find_test.cpp9
30 files changed, 213 insertions, 105 deletions
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index f47cc77cf9c..57e82e49067 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -440,7 +440,7 @@ private:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* result) override {
+ rpc::ReplyBuilderInterface* result) override {
uassertStatusOK(_command->explain(opCtx, *_request, verbosity, result));
}
@@ -492,7 +492,7 @@ Command::Command(StringData name, StringData oldName)
Status BasicCommand::explain(OperationContext* opCtx,
const OpMsgRequest& request,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const {
+ rpc::ReplyBuilderInterface* result) const {
return {ErrorCodes::IllegalOperation, str::stream() << "Cannot explain cmd: " << getName()};
}
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index 53788b16d3d..3dfe96e5cd8 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -441,7 +441,7 @@ public:
virtual void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* result) {
+ rpc::ReplyBuilderInterface* result) {
uasserted(ErrorCodes::IllegalOperation,
str::stream() << "Cannot explain cmd: " << definition()->getName());
}
@@ -561,7 +561,7 @@ public:
virtual Status explain(OperationContext* opCtx,
const OpMsgRequest& request,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const;
+ rpc::ReplyBuilderInterface* result) const;
/**
* Checks if the client associated with the given OperationContext is authorized to run this
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index 8818ded8b59..ecff6c12986 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -107,7 +107,7 @@ public:
Status explain(OperationContext* opCtx,
const OpMsgRequest& opMsgRequest,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const override {
+ rpc::ReplyBuilderInterface* result) const override {
std::string dbname = opMsgRequest.getDatabase().toString();
const BSONObj& cmdObj = opMsgRequest.body;
// Acquire locks and resolve possible UUID. The RAII object is optional, because in the case
@@ -143,7 +143,7 @@ public:
viewAggRequest.getValue().getNamespaceString(),
viewAggRequest.getValue(),
viewAggregation.getValue(),
- *out);
+ result);
}
Collection* const collection = ctx->getCollection();
@@ -160,7 +160,8 @@ public:
auto exec = std::move(statusWithPlanExecutor.getValue());
- Explain::explainStages(exec.get(), collection, verbosity, out);
+ auto bodyBuilder = result->getBodyBuilder();
+ Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder);
return Status::OK();
}
diff --git a/src/mongo/db/commands/current_op.cpp b/src/mongo/db/commands/current_op.cpp
index 47e22ea8296..3b0d2d1814f 100644
--- a/src/mongo/db/commands/current_op.cpp
+++ b/src/mongo/db/commands/current_op.cpp
@@ -66,18 +66,20 @@ public:
OperationContext* opCtx, const AggregationRequest& request) const final {
auto aggCmdObj = request.serializeToCommandObj().toBson();
- BSONObjBuilder responseBuilder;
+ rpc::OpMsgReplyBuilder replyBuilder;
auto status = runAggregate(
- opCtx, request.getNamespaceString(), request, std::move(aggCmdObj), responseBuilder);
+ opCtx, request.getNamespaceString(), request, std::move(aggCmdObj), &replyBuilder);
if (!status.isOK()) {
return status;
}
- CommandHelpers::appendSimpleCommandStatus(responseBuilder, true);
+ auto bodyBuilder = replyBuilder.getBodyBuilder();
+ CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true);
+ bodyBuilder.doneFast();
- return CursorResponse::parseFromBSON(responseBuilder.obj());
+ return CursorResponse::parseFromBSON(replyBuilder.releaseBody());
}
virtual void appendToResponse(BSONObjBuilder* result) const final {
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index 8802e4875cb..8b6098f4341 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -112,7 +112,7 @@ public:
Status explain(OperationContext* opCtx,
const OpMsgRequest& request,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const override {
+ rpc::ReplyBuilderInterface* result) const override {
std::string dbname = request.getDatabase().toString();
const BSONObj& cmdObj = request.body;
// Acquire locks and resolve possible UUID. The RAII object is optional, because in the case
@@ -143,7 +143,7 @@ public:
}
return runAggregate(
- opCtx, nss, viewAggRequest.getValue(), viewAggregation.getValue(), *out);
+ opCtx, nss, viewAggRequest.getValue(), viewAggregation.getValue(), result);
}
Collection* const collection = ctx->getCollection();
@@ -151,7 +151,8 @@ public:
auto executor =
uassertStatusOK(getExecutorDistinct(opCtx, collection, nss.ns(), &parsedDistinct));
- Explain::explainStages(executor.get(), collection, verbosity, out);
+ auto bodyBuilder = result->getBodyBuilder();
+ Explain::explainStages(executor.get(), collection, verbosity, &bodyBuilder);
return Status::OK();
}
diff --git a/src/mongo/db/commands/explain_cmd.cpp b/src/mongo/db/commands/explain_cmd.cpp
index b20a1a7d850..c41022b5550 100644
--- a/src/mongo/db/commands/explain_cmd.cpp
+++ b/src/mongo/db/commands/explain_cmd.cpp
@@ -97,13 +97,12 @@ public:
"Explain's child command cannot run on this node. "
"Are you explaining a write command on a secondary?",
commandCanRunHere(opCtx, _dbName, _innerInvocation->definition()));
- BSONObjBuilder bob = result->getBodyBuilder();
- _innerInvocation->explain(opCtx, _verbosity, &bob);
+ _innerInvocation->explain(opCtx, _verbosity, result);
}
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* result) override {
+ rpc::ReplyBuilderInterface* result) override {
uasserted(ErrorCodes::IllegalOperation, "Explain cannot explain itself.");
}
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 4c70636f928..070557d4f41 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -256,7 +256,7 @@ public:
Status explain(OperationContext* opCtx,
const OpMsgRequest& request,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const override {
+ rpc::ReplyBuilderInterface* result) const override {
std::string dbName = request.getDatabase().toString();
const BSONObj& cmdObj = request.body;
const auto args(uassertStatusOK(FindAndModifyRequest::parseFromBSON(
@@ -288,7 +288,8 @@ public:
const auto exec =
uassertStatusOK(getExecutorDelete(opCtx, opDebug, collection, &parsedDelete));
- Explain::explainStages(exec.get(), collection, verbosity, out);
+ auto bodyBuilder = result->getBodyBuilder();
+ Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder);
} else {
UpdateRequest request(nsString);
UpdateLifecycleImpl updateLifecycle(nsString);
@@ -312,7 +313,8 @@ public:
const auto exec =
uassertStatusOK(getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate));
- Explain::explainStages(exec.get(), collection, verbosity, out);
+ auto bodyBuilder = result->getBodyBuilder();
+ Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder);
}
return Status::OK();
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index e16f2473092..7c82c31da29 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -142,7 +142,7 @@ public:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* result) override {
+ rpc::ReplyBuilderInterface* result) override {
// Acquire locks and resolve possible UUID. The RAII object is optional, because in the
// case of a view, the locks need to be released.
boost::optional<AutoGetCollectionForReadCommand> ctx;
@@ -182,7 +182,7 @@ public:
try {
uassertStatusOK(
- runAggregate(opCtx, nss, aggRequest, viewAggregationCommand, *result));
+ runAggregate(opCtx, nss, aggRequest, viewAggregationCommand, result));
} catch (DBException& error) {
if (error.code() == ErrorCodes::InvalidPipelineOperator) {
uasserted(ErrorCodes::InvalidPipelineOperator,
@@ -201,8 +201,9 @@ public:
// We have a parsed query. Time to get the execution plan for it.
auto exec = uassertStatusOK(getExecutorFind(opCtx, collection, nss, std::move(cq)));
+ auto bodyBuilder = result->getBodyBuilder();
// Got the execution tree. Explain it.
- Explain::explainStages(exec.get(), collection, verbosity, result);
+ Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder);
}
/**
@@ -320,8 +321,9 @@ public:
const QueryRequest& originalQR = exec->getCanonicalQuery()->getQueryRequest();
// Stream query results, adding them to a BSONArray as we go.
- auto bodyBuilder = result->getBodyBuilder();
- CursorResponseBuilder firstBatch(/*isInitialResponse*/ true, &bodyBuilder);
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+ CursorResponseBuilder firstBatch(result, options);
BSONObj obj;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
long long numResults = 0;
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index e296e4e0c97..a6fa17ab2d6 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -244,7 +244,6 @@ public:
void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override {
// Counted as a getMore, not as a command.
globalOpCounters.gotGetMore();
- auto result = reply->getBodyBuilder();
auto curOp = CurOp::get(opCtx);
curOp->debug().cursorid = _request.cursorid;
@@ -428,7 +427,8 @@ public:
}
CursorId respondWithId = 0;
- CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result);
+
+ CursorResponseBuilder nextBatch(reply, CursorResponseBuilder::Options());
BSONObj obj;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
long long numResults = 0;
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index a7096773104..54ecc3df1ab 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -79,7 +79,6 @@ public:
}
void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override {
- auto bob = reply->getBodyBuilder();
const auto aggregationRequest = uassertStatusOK(
AggregationRequest::parseFromBSON(_dbName, _request.body, boost::none));
@@ -87,7 +86,7 @@ public:
aggregationRequest.getNamespaceString(),
aggregationRequest,
_request.body,
- bob));
+ reply));
}
NamespaceString ns() const override {
@@ -96,7 +95,7 @@ public:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) override {
+ rpc::ReplyBuilderInterface* result) override {
const auto aggregationRequest = uassertStatusOK(
AggregationRequest::parseFromBSON(_dbName, _request.body, verbosity));
@@ -104,7 +103,7 @@ public:
aggregationRequest.getNamespaceString(),
aggregationRequest,
_request.body,
- *out));
+ result));
}
void doCheckAuthorization(OperationContext* opCtx) const override {
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 5e00d3f867c..60efa126c3c 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -91,7 +91,7 @@ bool handleCursorCommand(OperationContext* opCtx,
const NamespaceString& nsForCursor,
std::vector<ClientCursor*> cursors,
const AggregationRequest& request,
- BSONObjBuilder& result) {
+ rpc::ReplyBuilderInterface* result) {
invariant(!cursors.empty());
long long batchSize = request.getBatchSize();
@@ -121,12 +121,15 @@ bool handleCursorCommand(OperationContext* opCtx,
cursors[idx]->getExecutor()->detachFromOperationContext();
}
- result.appendArray("cursors", cursorsBuilder.obj());
+ auto bodyBuilder = result->getBodyBuilder();
+ bodyBuilder.appendArray("cursors", cursorsBuilder.obj());
return true;
}
- CursorResponseBuilder responseBuilder(true, &result);
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+ CursorResponseBuilder responseBuilder(result, options);
ClientCursor* cursor = cursors[0];
invariant(cursor);
@@ -345,7 +348,7 @@ Status runAggregate(OperationContext* opCtx,
const NamespaceString& origNss,
const AggregationRequest& request,
const BSONObj& cmdObj,
- BSONObjBuilder& result) {
+ rpc::ReplyBuilderInterface* result) {
// For operations on views, this will be the underlying namespace.
NamespaceString nss = request.getNamespaceString();
@@ -611,8 +614,9 @@ Status runAggregate(OperationContext* opCtx,
// If both explain and cursor are specified, explain wins.
if (expCtx->explain) {
+ auto bodyBuilder = result->getBodyBuilder();
Explain::explainPipelineExecutor(
- pins[0].getCursor()->getExecutor(), *(expCtx->explain), &result);
+ pins[0].getCursor()->getExecutor(), *(expCtx->explain), &bodyBuilder);
} else {
// Cursor must be specified, if explain is not.
const bool keepCursor =
diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h
index 1412768ddcf..75212efc22c 100644
--- a/src/mongo/db/commands/run_aggregate.h
+++ b/src/mongo/db/commands/run_aggregate.h
@@ -33,6 +33,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
namespace mongo {
@@ -48,6 +49,6 @@ Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
const AggregationRequest& request,
const BSONObj& cmdObj,
- BSONObjBuilder& result);
+ rpc::ReplyBuilderInterface* result);
} // namespace mongo
diff --git a/src/mongo/db/commands/user_management_commands.cpp b/src/mongo/db/commands/user_management_commands.cpp
index 145cd247301..82823e3cfcc 100644
--- a/src/mongo/db/commands/user_management_commands.cpp
+++ b/src/mongo/db/commands/user_management_commands.cpp
@@ -1371,16 +1371,18 @@ public:
DBDirectClient client(opCtx);
- BSONObjBuilder responseBuilder;
+ rpc::OpMsgReplyBuilder replyBuilder;
AggregationRequest aggRequest(AuthorizationManager::usersCollectionNamespace,
std::move(pipeline));
uassertStatusOK(runAggregate(opCtx,
AuthorizationManager::usersCollectionNamespace,
aggRequest,
aggRequest.serializeToCommandObj().toBson(),
- responseBuilder));
- CommandHelpers::appendSimpleCommandStatus(responseBuilder, true);
- auto response = CursorResponse::parseFromBSONThrowing(responseBuilder.obj());
+ &replyBuilder));
+ auto bodyBuilder = replyBuilder.getBodyBuilder();
+ CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true);
+ bodyBuilder.doneFast();
+ auto response = CursorResponse::parseFromBSONThrowing(replyBuilder.releaseBody());
DBClientCursor cursor(&client,
response.getNSS().toString(),
response.getCursorId(),
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index 49e90ff0899..35d2e0ec53b 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -350,7 +350,7 @@ private:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) override {
+ rpc::ReplyBuilderInterface* result) override {
uassert(ErrorCodes::InvalidLength,
"explained write batches must be of size 1",
_batch.getUpdates().size() == 1);
@@ -376,7 +376,8 @@ private:
auto exec = uassertStatusOK(getExecutorUpdate(
opCtx, &CurOp::get(opCtx)->debug(), collection.getCollection(), &parsedUpdate));
- Explain::explainStages(exec.get(), collection.getCollection(), verbosity, out);
+ auto bodyBuilder = result->getBodyBuilder();
+ Explain::explainStages(exec.get(), collection.getCollection(), verbosity, &bodyBuilder);
}
write_ops::Update _batch;
@@ -426,7 +427,7 @@ private:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) override {
+ rpc::ReplyBuilderInterface* result) override {
uassert(ErrorCodes::InvalidLength,
"explained write batches must be of size 1",
_batch.getDeletes().size() == 1);
@@ -448,7 +449,8 @@ private:
// Explain the plan tree.
auto exec = uassertStatusOK(getExecutorDelete(
opCtx, &CurOp::get(opCtx)->debug(), collection.getCollection(), &parsedDelete));
- Explain::explainStages(exec.get(), collection.getCollection(), verbosity, out);
+ auto bodyBuilder = result->getBodyBuilder();
+ Explain::explainStages(exec.get(), collection.getCollection(), verbosity, &bodyBuilder);
}
write_ops::Delete _batch;
diff --git a/src/mongo/db/commands_test.cpp b/src/mongo/db/commands_test.cpp
index c92f72617c3..1b25d644765 100644
--- a/src/mongo/db/commands_test.cpp
+++ b/src/mongo/db/commands_test.cpp
@@ -218,7 +218,7 @@ public:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* result) override {}
+ rpc::ReplyBuilderInterface* result) override {}
void doCheckAuthorization(OperationContext*) const override {}
@@ -263,7 +263,7 @@ public:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* result) override {}
+ rpc::ReplyBuilderInterface* result) override {}
void doCheckAuthorization(OperationContext*) const override {}
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 7247f723a02..1605b395c7d 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -147,6 +147,7 @@ env.Library(
'$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/db/repl/optime',
'$BUILD_DIR/mongo/rpc/command_status',
+ '$BUILD_DIR/mongo/rpc/rpc',
'query_request',
]
)
@@ -164,6 +165,7 @@ env.CppUnitTest(
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/pipeline/aggregation_request",
+ '$BUILD_DIR/mongo/rpc/rpc',
'command_request_response',
]
)
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index e449aad0814..14b8af5e8f6 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -44,34 +44,52 @@ const char kIdField[] = "id";
const char kNsField[] = "ns";
const char kBatchField[] = "nextBatch";
const char kBatchFieldInitial[] = "firstBatch";
+const char kBatchDocSequenceField[] = "cursor.nextBatch";
+const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch";
const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp";
} // namespace
-CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse,
- BSONObjBuilder* commandResponse)
- : _responseInitialLen(commandResponse->bb().len()),
- _commandResponse(commandResponse),
- _cursorObject(commandResponse->subobjStart(kCursorField)),
- _batch(_cursorObject.subarrayStart(isInitialResponse ? kBatchFieldInitial : kBatchField)) {}
+CursorResponseBuilder::CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder,
+ Options options = Options())
+ : _options(options), _replyBuilder(replyBuilder) {
+ if (_options.useDocumentSequences) {
+ _docSeqBuilder.emplace(_replyBuilder->getDocSequenceBuilder(
+ _options.isInitialResponse ? kBatchDocSequenceFieldInitial : kBatchDocSequenceField));
+ } else {
+ _bodyBuilder.emplace(_replyBuilder->getBodyBuilder());
+ _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField));
+ _batch.emplace(_cursorObject->subarrayStart(_options.isInitialResponse ? kBatchFieldInitial
+ : kBatchField));
+ }
+}
void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) {
invariant(_active);
- _batch.doneFast();
- _cursorObject.append(kIdField, cursorId);
- _cursorObject.append(kNsField, cursorNamespace);
- _cursorObject.doneFast();
+ if (_options.useDocumentSequences) {
+ _docSeqBuilder.reset();
+ _bodyBuilder.emplace(_replyBuilder->getBodyBuilder());
+ _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField));
+ } else {
+ _batch.reset();
+ }
+ _cursorObject->append(kIdField, cursorId);
+ _cursorObject->append(kNsField, cursorNamespace);
+ _cursorObject.reset();
+
if (!_latestOplogTimestamp.isNull()) {
- _commandResponse->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp);
+ _bodyBuilder->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp);
}
+ _bodyBuilder.reset();
_active = false;
}
void CursorResponseBuilder::abandon() {
invariant(_active);
- _batch.doneFast();
- _cursorObject.doneFast();
- _commandResponse->bb().setlen(_responseInitialLen); // Removes everything we've added.
+ _batch.reset();
+ _cursorObject.reset();
+ _bodyBuilder.reset();
+ _replyBuilder->reset();
_numDocs = 0;
_active = false;
}
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index 091b64e0c73..529654118df 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -35,6 +35,8 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/rpc/op_msg.h"
+#include "mongo/rpc/reply_builder_interface.h"
namespace mongo {
@@ -47,14 +49,23 @@ class CursorResponseBuilder {
public:
/**
- * Once constructed, you may not use the passed-in BSONObjBuilder until you call either done()
+ * Structure used to confiugre the CursorResponseBuilder.
+ */
+ struct Options {
+ bool isInitialResponse = false;
+ bool useDocumentSequences = false;
+ };
+
+ /**
+ * Once constructed, you may not use the passed-in ReplyBuilderInterface until you call either
+ * done()
* or abandon(), or this object goes out of scope. This is the same as the rule when using a
* BSONObjBuilder to build a sub-object with subobjStart().
*
- * If the builder goes out of scope without a call to done(), any data appended to the
- * builder will be removed.
+ * If the builder goes out of scope without a call to done(), the ReplyBuilderInterface will be
+ * reset.
*/
- CursorResponseBuilder(bool isInitialResponse, BSONObjBuilder* commandResponse);
+ CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder, Options options);
~CursorResponseBuilder() {
if (_active)
@@ -63,12 +74,16 @@ public:
size_t bytesUsed() const {
invariant(_active);
- return _batch.len();
+ return _options.useDocumentSequences ? _docSeqBuilder->len() : _batch->len();
}
void append(const BSONObj& obj) {
invariant(_active);
- _batch.append(obj);
+ if (_options.useDocumentSequences) {
+ _docSeqBuilder->append(obj);
+ } else {
+ _batch->append(obj);
+ }
_numDocs++;
}
@@ -94,11 +109,15 @@ public:
void abandon();
private:
- const int _responseInitialLen; // Must be the first member so its initializer runs first.
+ const Options _options;
+ rpc::ReplyBuilderInterface* const _replyBuilder;
+ // Order here is important to ensure destruction in the correct order.
+ boost::optional<BSONObjBuilder> _bodyBuilder;
+ boost::optional<BSONObjBuilder> _cursorObject;
+ boost::optional<BSONArrayBuilder> _batch;
+ boost::optional<OpMsgBuilder::DocSequenceBuilder> _docSeqBuilder;
+
bool _active = true;
- BSONObjBuilder* const _commandResponse;
- BSONObjBuilder _cursorObject;
- BSONArrayBuilder _batch;
long long _numDocs = 0;
Timestamp _latestOplogTimestamp;
};
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index eba5d6748fe..cfe3c8ac3cf 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -30,6 +30,8 @@
#include "mongo/db/query/cursor_response.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
+
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -330,6 +332,32 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) {
ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2));
}
+TEST(CursorResponseTest, cursorReturnDocumentSequences) {
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+ options.useDocumentSequences = true;
+ rpc::OpMsgReplyBuilder builder;
+ BSONObj expectedDoc = BSON("_id" << 1 << "test"
+ << "123");
+ BSONObj expectedBody = BSON("cursor" << BSON("id" << CursorId(123) << "ns"
+ << "db.coll"));
+
+ CursorResponseBuilder crb(&builder, options);
+ crb.append(expectedDoc);
+ ASSERT_EQ(crb.numDocs(), 1U);
+ crb.done(CursorId(123), "db.coll");
+
+ auto msg = builder.done();
+ auto opMsg = OpMsg::parse(msg);
+ const auto& docSeqs = opMsg.sequences;
+ ASSERT_EQ(docSeqs.size(), 1U);
+ const auto& documentSequence = docSeqs[0];
+ ASSERT_EQ(documentSequence.name, "cursor.firstBatch");
+ ASSERT_EQ(documentSequence.objs.size(), 1U);
+ ASSERT_BSONOBJ_EQ(documentSequence.objs[0], expectedDoc);
+ ASSERT_BSONOBJ_EQ(opMsg.body, expectedBody);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index c7c7694daae..9a10be835e7 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/sessions_collection_rs.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/op_msg.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_find.h"
@@ -145,14 +146,16 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
return ex.toStatus();
}
- BSONObjBuilder result;
- CursorResponseBuilder firstBatch(/*firstBatch*/ true, &result);
+ rpc::OpMsgReplyBuilder replyBuilder;
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+ CursorResponseBuilder firstBatch(&replyBuilder, options);
for (const auto& obj : batch) {
firstBatch.append(obj);
}
firstBatch.done(cursorId, NamespaceString::kLogicalSessionsNamespace.ns());
- return result.obj();
+ return replyBuilder.releaseBody();
};
return doFetch(NamespaceString::kLogicalSessionsNamespace, sessions, send);
diff --git a/src/mongo/rpc/op_msg.h b/src/mongo/rpc/op_msg.h
index dbe6a1506ef..75a5940ca72 100644
--- a/src/mongo/rpc/op_msg.h
+++ b/src/mongo/rpc/op_msg.h
@@ -319,6 +319,10 @@ public:
return BSONObjBuilder(*_buf);
}
+ int len() const {
+ return _buf->len();
+ }
+
private:
friend OpMsgBuilder;
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index dcbd8074319..1aa3045ea5f 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -33,6 +33,7 @@
#include "mongo/s/commands/cluster_aggregate.h"
#include <boost/intrusive_ptr.hpp>
+#include <mongo/rpc/op_msg_rpc_impls.h>
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/authorization_session.h"
@@ -543,9 +544,12 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
- BSONObjBuilder cursorResponse;
- CursorResponseBuilder responseBuilder(true, &cursorResponse);
+ rpc::OpMsgReplyBuilder replyBuilder;
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+
+ CursorResponseBuilder responseBuilder(&replyBuilder, options);
for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
ClusterQueryResult next;
@@ -609,9 +613,11 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
responseBuilder.done(clusterCursorId, requestedNss.ns());
- CommandHelpers::appendSimpleCommandStatus(cursorResponse, true);
+ auto bodyBuilder = replyBuilder.getBodyBuilder();
+ CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true);
+ bodyBuilder.doneFast();
- return cursorResponse.obj();
+ return replyBuilder.releaseBody();
}
/**
diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp
index f443c297905..6fdd4a2d0f0 100644
--- a/src/mongo/s/commands/cluster_count_cmd.cpp
+++ b/src/mongo/s/commands/cluster_count_cmd.cpp
@@ -214,7 +214,7 @@ public:
Status explain(OperationContext* opCtx,
const OpMsgRequest& request,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const override {
+ rpc::ReplyBuilderInterface* result) const override {
std::string dbname = request.getDatabase().toString();
const BSONObj& cmdObj = request.body;
const NamespaceString nss(parseNs(dbname, cmdObj));
@@ -282,8 +282,9 @@ public:
nsStruct.requestedNss = nss;
nsStruct.executionNss = resolvedAggRequest.getNamespaceString();
+ auto bodyBuilder = result->getBodyBuilder();
return ClusterAggregate::runAggregate(
- opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, out);
+ opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &bodyBuilder);
}
long long millisElapsed = timer.millis();
@@ -291,12 +292,13 @@ public:
const char* mongosStageName =
ClusterExplain::getStageNameForReadOp(shardResponses.size(), cmdObj);
+ auto bodyBuilder = result->getBodyBuilder();
return ClusterExplain::buildExplainResult(
opCtx,
ClusterExplain::downconvert(opCtx, shardResponses),
mongosStageName,
millisElapsed,
- out);
+ &bodyBuilder);
}
private:
diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp
index 09ab87aa48b..cffde0ec3b0 100644
--- a/src/mongo/s/commands/cluster_distinct_cmd.cpp
+++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp
@@ -86,7 +86,7 @@ public:
Status explain(OperationContext* opCtx,
const OpMsgRequest& opMsgRequest,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const override {
+ rpc::ReplyBuilderInterface* result) const override {
std::string dbname = opMsgRequest.getDatabase().toString();
const BSONObj& cmdObj = opMsgRequest.body;
const NamespaceString nss(parseNs(dbname, cmdObj));
@@ -138,8 +138,9 @@ public:
nsStruct.requestedNss = nss;
nsStruct.executionNss = resolvedAggRequest.getNamespaceString();
+ auto bodyBuilder = result->getBodyBuilder();
return ClusterAggregate::runAggregate(
- opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, out);
+ opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &bodyBuilder);
}
long long millisElapsed = timer.millis();
@@ -147,12 +148,13 @@ public:
const char* mongosStageName =
ClusterExplain::getStageNameForReadOp(shardResponses.size(), cmdObj);
+ auto bodyBuilder = result->getBodyBuilder();
return ClusterExplain::buildExplainResult(
opCtx,
ClusterExplain::downconvert(opCtx, shardResponses),
mongosStageName,
millisElapsed,
- out);
+ &bodyBuilder);
}
bool run(OperationContext* opCtx,
diff --git a/src/mongo/s/commands/cluster_explain_cmd.cpp b/src/mongo/s/commands/cluster_explain_cmd.cpp
index a04c138ddba..0b7f4c1f286 100644
--- a/src/mongo/s/commands/cluster_explain_cmd.cpp
+++ b/src/mongo/s/commands/cluster_explain_cmd.cpp
@@ -96,13 +96,12 @@ public:
private:
void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override {
- auto bob = result->getBodyBuilder();
- _innerInvocation->explain(opCtx, _verbosity, &bob);
+ _innerInvocation->explain(opCtx, _verbosity, result);
}
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* result) override {
+ rpc::ReplyBuilderInterface* result) override {
uasserted(ErrorCodes::IllegalOperation, "Explain cannot explain itself.");
}
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 8393517bac7..75347473ce3 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -105,7 +105,7 @@ public:
Status explain(OperationContext* opCtx,
const OpMsgRequest& request,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const override {
+ rpc::ReplyBuilderInterface* result) const override {
std::string dbName = request.getDatabase().toString();
const BSONObj& cmdObj = request.body;
const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj));
@@ -134,25 +134,26 @@ public:
// Time how long it takes to run the explain command on the shard.
Timer timer;
- BSONObjBuilder result;
+ BSONObjBuilder bob;
_runCommand(opCtx,
shard->getId(),
(chunkMgr ? chunkMgr->getVersion(shard->getId()) : ChunkVersion::UNSHARDED()),
nss,
explainCmd,
- &result);
+ &bob);
const auto millisElapsed = timer.millis();
Strategy::CommandResult cmdResult;
cmdResult.shardTargetId = shard->getId();
cmdResult.target = shard->getConnString();
- cmdResult.result = result.obj();
+ cmdResult.result = bob.obj();
std::vector<Strategy::CommandResult> shardResults;
shardResults.push_back(cmdResult);
+ auto bodyBuilder = result->getBodyBuilder();
return ClusterExplain::buildExplainResult(
- opCtx, shardResults, ClusterExplain::kSingleShard, millisElapsed, out);
+ opCtx, shardResults, ClusterExplain::kSingleShard, millisElapsed, &bodyBuilder);
}
bool run(OperationContext* opCtx,
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp
index f879e3f1b96..a69d1e4a44f 100644
--- a/src/mongo/s/commands/cluster_find_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_cmd.cpp
@@ -119,7 +119,7 @@ public:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* result) override {
+ rpc::ReplyBuilderInterface* result) override {
// Parse the command BSON to a QueryRequest.
bool isExplain = true;
auto qr =
@@ -150,15 +150,17 @@ public:
const char* mongosStageName =
ClusterExplain::getStageNameForReadOp(shardResponses.size(), _request.body);
+ auto bodyBuilder = result->getBodyBuilder();
uassertStatusOK(ClusterExplain::buildExplainResult(
opCtx,
ClusterExplain::downconvert(opCtx, shardResponses),
mongosStageName,
millisElapsed,
- result));
+ &bodyBuilder));
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
- result->resetToEmpty();
+ auto bodyBuilder = result->getBodyBuilder();
+ bodyBuilder.resetToEmpty();
auto aggCmdOnView = uassertStatusOK(qr->asAggregationCommand());
@@ -173,7 +175,7 @@ public:
nsStruct.executionNss = std::move(ex->getNamespace());
uassertStatusOK(ClusterAggregate::runAggregate(
- opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, result));
+ opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &bodyBuilder));
}
}
@@ -199,9 +201,11 @@ public:
std::vector<BSONObj> batch;
auto cursorId =
ClusterFind::runQuery(opCtx, *cq, ReadPreferenceSetting::get(opCtx), &batch);
- auto bodyBuilder = result->getBodyBuilder();
+
// Build the response document.
- CursorResponseBuilder firstBatch(/*firstBatch*/ true, &bodyBuilder);
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+ CursorResponseBuilder firstBatch(result, options);
for (const auto& obj : batch) {
firstBatch.append(obj);
}
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 897d4b4091b..ead254ff71f 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -85,8 +85,9 @@ public:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) override {
- _runAggCommand(opCtx, _dbName, _request.body, verbosity, out);
+ rpc::ReplyBuilderInterface* result) override {
+ auto bodyBuilder = result->getBodyBuilder();
+ _runAggCommand(opCtx, _dbName, _request.body, verbosity, &bodyBuilder);
}
void doCheckAuthorization(OperationContext* opCtx) const override {
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index b4bbc66d893..f4f3f789038 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -311,7 +311,7 @@ private:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* result) override {
+ rpc::ReplyBuilderInterface* result) override {
uassert(ErrorCodes::InvalidLength,
"explained write batches must be of size 1",
_batchedRequest.sizeWriteOps() == 1U);
@@ -329,8 +329,9 @@ private:
explainCmd,
targetingBatchItem,
&shardResults));
+ auto bodyBuilder = result->getBodyBuilder();
uassertStatusOK(ClusterExplain::buildExplainResult(
- opCtx, shardResults, ClusterExplain::kWriteOnShards, timer.millis(), result));
+ opCtx, shardResults, ClusterExplain::kWriteOnShards, timer.millis(), &bodyBuilder));
}
NamespaceString ns() const override {
diff --git a/src/mongo/s/query/cluster_find_test.cpp b/src/mongo/s/query/cluster_find_test.cpp
index ccba55977a6..fb89bfa5e93 100644
--- a/src/mongo/s/query/cluster_find_test.cpp
+++ b/src/mongo/s/query/cluster_find_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/query_request.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache_test_fixture.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/unittest/unittest.h"
@@ -122,14 +123,16 @@ protected:
auto cursorId = ClusterFind::runQuery(
operationContext(), *cq, ReadPreferenceSetting(ReadPreference::PrimaryOnly), &batch);
- BSONObjBuilder result;
- CursorResponseBuilder firstBatch(/* firstBatch */ true, &result);
+ rpc::OpMsgReplyBuilder result;
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+ CursorResponseBuilder firstBatch(&result, options);
for (const auto& obj : batch) {
firstBatch.append(obj);
}
firstBatch.done(cursorId, nss.ns());
- return result.obj();
+ return result.releaseBody();
}
void runFindCommandSuccessful(BSONObj cmd, bool isTargeted) {