diff options
30 files changed, 213 insertions, 105 deletions
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index f47cc77cf9c..57e82e49067 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -440,7 +440,7 @@ private: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override { + rpc::ReplyBuilderInterface* result) override { uassertStatusOK(_command->explain(opCtx, *_request, verbosity, result)); } @@ -492,7 +492,7 @@ Command::Command(StringData name, StringData oldName) Status BasicCommand::explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const { + rpc::ReplyBuilderInterface* result) const { return {ErrorCodes::IllegalOperation, str::stream() << "Cannot explain cmd: " << getName()}; } diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 53788b16d3d..3dfe96e5cd8 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -441,7 +441,7 @@ public: virtual void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) { + rpc::ReplyBuilderInterface* result) { uasserted(ErrorCodes::IllegalOperation, str::stream() << "Cannot explain cmd: " << definition()->getName()); } @@ -561,7 +561,7 @@ public: virtual Status explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const; + rpc::ReplyBuilderInterface* result) const; /** * Checks if the client associated with the given OperationContext is authorized to run this diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index 8818ded8b59..ecff6c12986 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -107,7 +107,7 @@ public: Status explain(OperationContext* opCtx, const OpMsgRequest& opMsgRequest, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { + rpc::ReplyBuilderInterface* result) const override { std::string dbname = opMsgRequest.getDatabase().toString(); const BSONObj& cmdObj = opMsgRequest.body; // Acquire locks and resolve possible UUID. The RAII object is optional, because in the case @@ -143,7 +143,7 @@ public: viewAggRequest.getValue().getNamespaceString(), viewAggRequest.getValue(), viewAggregation.getValue(), - *out); + result); } Collection* const collection = ctx->getCollection(); @@ -160,7 +160,8 @@ public: auto exec = std::move(statusWithPlanExecutor.getValue()); - Explain::explainStages(exec.get(), collection, verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder); return Status::OK(); } diff --git a/src/mongo/db/commands/current_op.cpp b/src/mongo/db/commands/current_op.cpp index 47e22ea8296..3b0d2d1814f 100644 --- a/src/mongo/db/commands/current_op.cpp +++ b/src/mongo/db/commands/current_op.cpp @@ -66,18 +66,20 @@ public: OperationContext* opCtx, const AggregationRequest& request) const final { auto aggCmdObj = request.serializeToCommandObj().toBson(); - BSONObjBuilder responseBuilder; + rpc::OpMsgReplyBuilder replyBuilder; auto status = runAggregate( - opCtx, request.getNamespaceString(), request, std::move(aggCmdObj), responseBuilder); + opCtx, request.getNamespaceString(), request, std::move(aggCmdObj), &replyBuilder); if (!status.isOK()) { return status; } - CommandHelpers::appendSimpleCommandStatus(responseBuilder, true); + auto bodyBuilder = replyBuilder.getBodyBuilder(); + CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true); + bodyBuilder.doneFast(); - return CursorResponse::parseFromBSON(responseBuilder.obj()); + return CursorResponse::parseFromBSON(replyBuilder.releaseBody()); } virtual void appendToResponse(BSONObjBuilder* result) const final { diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index 8802e4875cb..8b6098f4341 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -112,7 +112,7 @@ public: Status explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { + rpc::ReplyBuilderInterface* result) const override { std::string dbname = request.getDatabase().toString(); const BSONObj& cmdObj = request.body; // Acquire locks and resolve possible UUID. The RAII object is optional, because in the case @@ -143,7 +143,7 @@ public: } return runAggregate( - opCtx, nss, viewAggRequest.getValue(), viewAggregation.getValue(), *out); + opCtx, nss, viewAggRequest.getValue(), viewAggregation.getValue(), result); } Collection* const collection = ctx->getCollection(); @@ -151,7 +151,8 @@ public: auto executor = uassertStatusOK(getExecutorDistinct(opCtx, collection, nss.ns(), &parsedDistinct)); - Explain::explainStages(executor.get(), collection, verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(executor.get(), collection, verbosity, &bodyBuilder); return Status::OK(); } diff --git a/src/mongo/db/commands/explain_cmd.cpp b/src/mongo/db/commands/explain_cmd.cpp index b20a1a7d850..c41022b5550 100644 --- a/src/mongo/db/commands/explain_cmd.cpp +++ b/src/mongo/db/commands/explain_cmd.cpp @@ -97,13 +97,12 @@ public: "Explain's child command cannot run on this node. " "Are you explaining a write command on a secondary?", commandCanRunHere(opCtx, _dbName, _innerInvocation->definition())); - BSONObjBuilder bob = result->getBodyBuilder(); - _innerInvocation->explain(opCtx, _verbosity, &bob); + _innerInvocation->explain(opCtx, _verbosity, result); } void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override { + rpc::ReplyBuilderInterface* result) override { uasserted(ErrorCodes::IllegalOperation, "Explain cannot explain itself."); } diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 4c70636f928..070557d4f41 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -256,7 +256,7 @@ public: Status explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { + rpc::ReplyBuilderInterface* result) const override { std::string dbName = request.getDatabase().toString(); const BSONObj& cmdObj = request.body; const auto args(uassertStatusOK(FindAndModifyRequest::parseFromBSON( @@ -288,7 +288,8 @@ public: const auto exec = uassertStatusOK(getExecutorDelete(opCtx, opDebug, collection, &parsedDelete)); - Explain::explainStages(exec.get(), collection, verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder); } else { UpdateRequest request(nsString); UpdateLifecycleImpl updateLifecycle(nsString); @@ -312,7 +313,8 @@ public: const auto exec = uassertStatusOK(getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate)); - Explain::explainStages(exec.get(), collection, verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder); } return Status::OK(); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index e16f2473092..7c82c31da29 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -142,7 +142,7 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override { + rpc::ReplyBuilderInterface* result) override { // Acquire locks and resolve possible UUID. The RAII object is optional, because in the // case of a view, the locks need to be released. boost::optional<AutoGetCollectionForReadCommand> ctx; @@ -182,7 +182,7 @@ public: try { uassertStatusOK( - runAggregate(opCtx, nss, aggRequest, viewAggregationCommand, *result)); + runAggregate(opCtx, nss, aggRequest, viewAggregationCommand, result)); } catch (DBException& error) { if (error.code() == ErrorCodes::InvalidPipelineOperator) { uasserted(ErrorCodes::InvalidPipelineOperator, @@ -201,8 +201,9 @@ public: // We have a parsed query. Time to get the execution plan for it. auto exec = uassertStatusOK(getExecutorFind(opCtx, collection, nss, std::move(cq))); + auto bodyBuilder = result->getBodyBuilder(); // Got the execution tree. Explain it. - Explain::explainStages(exec.get(), collection, verbosity, result); + Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder); } /** @@ -320,8 +321,9 @@ public: const QueryRequest& originalQR = exec->getCanonicalQuery()->getQueryRequest(); // Stream query results, adding them to a BSONArray as we go. - auto bodyBuilder = result->getBodyBuilder(); - CursorResponseBuilder firstBatch(/*isInitialResponse*/ true, &bodyBuilder); + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + CursorResponseBuilder firstBatch(result, options); BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; long long numResults = 0; diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index e296e4e0c97..a6fa17ab2d6 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -244,7 +244,6 @@ public: void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { // Counted as a getMore, not as a command. globalOpCounters.gotGetMore(); - auto result = reply->getBodyBuilder(); auto curOp = CurOp::get(opCtx); curOp->debug().cursorid = _request.cursorid; @@ -428,7 +427,8 @@ public: } CursorId respondWithId = 0; - CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result); + + CursorResponseBuilder nextBatch(reply, CursorResponseBuilder::Options()); BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; long long numResults = 0; diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index a7096773104..54ecc3df1ab 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -79,7 +79,6 @@ public: } void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { - auto bob = reply->getBodyBuilder(); const auto aggregationRequest = uassertStatusOK( AggregationRequest::parseFromBSON(_dbName, _request.body, boost::none)); @@ -87,7 +86,7 @@ public: aggregationRequest.getNamespaceString(), aggregationRequest, _request.body, - bob)); + reply)); } NamespaceString ns() const override { @@ -96,7 +95,7 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) override { + rpc::ReplyBuilderInterface* result) override { const auto aggregationRequest = uassertStatusOK( AggregationRequest::parseFromBSON(_dbName, _request.body, verbosity)); @@ -104,7 +103,7 @@ public: aggregationRequest.getNamespaceString(), aggregationRequest, _request.body, - *out)); + result)); } void doCheckAuthorization(OperationContext* opCtx) const override { diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 5e00d3f867c..60efa126c3c 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -91,7 +91,7 @@ bool handleCursorCommand(OperationContext* opCtx, const NamespaceString& nsForCursor, std::vector<ClientCursor*> cursors, const AggregationRequest& request, - BSONObjBuilder& result) { + rpc::ReplyBuilderInterface* result) { invariant(!cursors.empty()); long long batchSize = request.getBatchSize(); @@ -121,12 +121,15 @@ bool handleCursorCommand(OperationContext* opCtx, cursors[idx]->getExecutor()->detachFromOperationContext(); } - result.appendArray("cursors", cursorsBuilder.obj()); + auto bodyBuilder = result->getBodyBuilder(); + bodyBuilder.appendArray("cursors", cursorsBuilder.obj()); return true; } - CursorResponseBuilder responseBuilder(true, &result); + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + CursorResponseBuilder responseBuilder(result, options); ClientCursor* cursor = cursors[0]; invariant(cursor); @@ -345,7 +348,7 @@ Status runAggregate(OperationContext* opCtx, const NamespaceString& origNss, const AggregationRequest& request, const BSONObj& cmdObj, - BSONObjBuilder& result) { + rpc::ReplyBuilderInterface* result) { // For operations on views, this will be the underlying namespace. NamespaceString nss = request.getNamespaceString(); @@ -611,8 +614,9 @@ Status runAggregate(OperationContext* opCtx, // If both explain and cursor are specified, explain wins. if (expCtx->explain) { + auto bodyBuilder = result->getBodyBuilder(); Explain::explainPipelineExecutor( - pins[0].getCursor()->getExecutor(), *(expCtx->explain), &result); + pins[0].getCursor()->getExecutor(), *(expCtx->explain), &bodyBuilder); } else { // Cursor must be specified, if explain is not. const bool keepCursor = diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h index 1412768ddcf..75212efc22c 100644 --- a/src/mongo/db/commands/run_aggregate.h +++ b/src/mongo/db/commands/run_aggregate.h @@ -33,6 +33,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/rpc/op_msg_rpc_impls.h" namespace mongo { @@ -48,6 +49,6 @@ Status runAggregate(OperationContext* opCtx, const NamespaceString& nss, const AggregationRequest& request, const BSONObj& cmdObj, - BSONObjBuilder& result); + rpc::ReplyBuilderInterface* result); } // namespace mongo diff --git a/src/mongo/db/commands/user_management_commands.cpp b/src/mongo/db/commands/user_management_commands.cpp index 145cd247301..82823e3cfcc 100644 --- a/src/mongo/db/commands/user_management_commands.cpp +++ b/src/mongo/db/commands/user_management_commands.cpp @@ -1371,16 +1371,18 @@ public: DBDirectClient client(opCtx); - BSONObjBuilder responseBuilder; + rpc::OpMsgReplyBuilder replyBuilder; AggregationRequest aggRequest(AuthorizationManager::usersCollectionNamespace, std::move(pipeline)); uassertStatusOK(runAggregate(opCtx, AuthorizationManager::usersCollectionNamespace, aggRequest, aggRequest.serializeToCommandObj().toBson(), - responseBuilder)); - CommandHelpers::appendSimpleCommandStatus(responseBuilder, true); - auto response = CursorResponse::parseFromBSONThrowing(responseBuilder.obj()); + &replyBuilder)); + auto bodyBuilder = replyBuilder.getBodyBuilder(); + CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true); + bodyBuilder.doneFast(); + auto response = CursorResponse::parseFromBSONThrowing(replyBuilder.releaseBody()); DBClientCursor cursor(&client, response.getNSS().toString(), response.getCursorId(), diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index 49e90ff0899..35d2e0ec53b 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -350,7 +350,7 @@ private: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) override { + rpc::ReplyBuilderInterface* result) override { uassert(ErrorCodes::InvalidLength, "explained write batches must be of size 1", _batch.getUpdates().size() == 1); @@ -376,7 +376,8 @@ private: auto exec = uassertStatusOK(getExecutorUpdate( opCtx, &CurOp::get(opCtx)->debug(), collection.getCollection(), &parsedUpdate)); - Explain::explainStages(exec.get(), collection.getCollection(), verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection.getCollection(), verbosity, &bodyBuilder); } write_ops::Update _batch; @@ -426,7 +427,7 @@ private: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) override { + rpc::ReplyBuilderInterface* result) override { uassert(ErrorCodes::InvalidLength, "explained write batches must be of size 1", _batch.getDeletes().size() == 1); @@ -448,7 +449,8 @@ private: // Explain the plan tree. auto exec = uassertStatusOK(getExecutorDelete( opCtx, &CurOp::get(opCtx)->debug(), collection.getCollection(), &parsedDelete)); - Explain::explainStages(exec.get(), collection.getCollection(), verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection.getCollection(), verbosity, &bodyBuilder); } write_ops::Delete _batch; diff --git a/src/mongo/db/commands_test.cpp b/src/mongo/db/commands_test.cpp index c92f72617c3..1b25d644765 100644 --- a/src/mongo/db/commands_test.cpp +++ b/src/mongo/db/commands_test.cpp @@ -218,7 +218,7 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override {} + rpc::ReplyBuilderInterface* result) override {} void doCheckAuthorization(OperationContext*) const override {} @@ -263,7 +263,7 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override {} + rpc::ReplyBuilderInterface* result) override {} void doCheckAuthorization(OperationContext*) const override {} diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 7247f723a02..1605b395c7d 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -147,6 +147,7 @@ env.Library( '$BUILD_DIR/mongo/db/namespace_string', '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/rpc/command_status', + '$BUILD_DIR/mongo/rpc/rpc', 'query_request', ] ) @@ -164,6 +165,7 @@ env.CppUnitTest( ], LIBDEPS=[ "$BUILD_DIR/mongo/db/pipeline/aggregation_request", + '$BUILD_DIR/mongo/rpc/rpc', 'command_request_response', ] ) diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index e449aad0814..14b8af5e8f6 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -44,34 +44,52 @@ const char kIdField[] = "id"; const char kNsField[] = "ns"; const char kBatchField[] = "nextBatch"; const char kBatchFieldInitial[] = "firstBatch"; +const char kBatchDocSequenceField[] = "cursor.nextBatch"; +const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch"; const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp"; } // namespace -CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse, - BSONObjBuilder* commandResponse) - : _responseInitialLen(commandResponse->bb().len()), - _commandResponse(commandResponse), - _cursorObject(commandResponse->subobjStart(kCursorField)), - _batch(_cursorObject.subarrayStart(isInitialResponse ? kBatchFieldInitial : kBatchField)) {} +CursorResponseBuilder::CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder, + Options options = Options()) + : _options(options), _replyBuilder(replyBuilder) { + if (_options.useDocumentSequences) { + _docSeqBuilder.emplace(_replyBuilder->getDocSequenceBuilder( + _options.isInitialResponse ? kBatchDocSequenceFieldInitial : kBatchDocSequenceField)); + } else { + _bodyBuilder.emplace(_replyBuilder->getBodyBuilder()); + _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField)); + _batch.emplace(_cursorObject->subarrayStart(_options.isInitialResponse ? kBatchFieldInitial + : kBatchField)); + } +} void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) { invariant(_active); - _batch.doneFast(); - _cursorObject.append(kIdField, cursorId); - _cursorObject.append(kNsField, cursorNamespace); - _cursorObject.doneFast(); + if (_options.useDocumentSequences) { + _docSeqBuilder.reset(); + _bodyBuilder.emplace(_replyBuilder->getBodyBuilder()); + _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField)); + } else { + _batch.reset(); + } + _cursorObject->append(kIdField, cursorId); + _cursorObject->append(kNsField, cursorNamespace); + _cursorObject.reset(); + if (!_latestOplogTimestamp.isNull()) { - _commandResponse->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp); + _bodyBuilder->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp); } + _bodyBuilder.reset(); _active = false; } void CursorResponseBuilder::abandon() { invariant(_active); - _batch.doneFast(); - _cursorObject.doneFast(); - _commandResponse->bb().setlen(_responseInitialLen); // Removes everything we've added. + _batch.reset(); + _cursorObject.reset(); + _bodyBuilder.reset(); + _replyBuilder->reset(); _numDocs = 0; _active = false; } diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index 091b64e0c73..529654118df 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -35,6 +35,8 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/clientcursor.h" #include "mongo/db/namespace_string.h" +#include "mongo/rpc/op_msg.h" +#include "mongo/rpc/reply_builder_interface.h" namespace mongo { @@ -47,14 +49,23 @@ class CursorResponseBuilder { public: /** - * Once constructed, you may not use the passed-in BSONObjBuilder until you call either done() + * Structure used to confiugre the CursorResponseBuilder. + */ + struct Options { + bool isInitialResponse = false; + bool useDocumentSequences = false; + }; + + /** + * Once constructed, you may not use the passed-in ReplyBuilderInterface until you call either + * done() * or abandon(), or this object goes out of scope. This is the same as the rule when using a * BSONObjBuilder to build a sub-object with subobjStart(). * - * If the builder goes out of scope without a call to done(), any data appended to the - * builder will be removed. + * If the builder goes out of scope without a call to done(), the ReplyBuilderInterface will be + * reset. */ - CursorResponseBuilder(bool isInitialResponse, BSONObjBuilder* commandResponse); + CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder, Options options); ~CursorResponseBuilder() { if (_active) @@ -63,12 +74,16 @@ public: size_t bytesUsed() const { invariant(_active); - return _batch.len(); + return _options.useDocumentSequences ? _docSeqBuilder->len() : _batch->len(); } void append(const BSONObj& obj) { invariant(_active); - _batch.append(obj); + if (_options.useDocumentSequences) { + _docSeqBuilder->append(obj); + } else { + _batch->append(obj); + } _numDocs++; } @@ -94,11 +109,15 @@ public: void abandon(); private: - const int _responseInitialLen; // Must be the first member so its initializer runs first. + const Options _options; + rpc::ReplyBuilderInterface* const _replyBuilder; + // Order here is important to ensure destruction in the correct order. + boost::optional<BSONObjBuilder> _bodyBuilder; + boost::optional<BSONObjBuilder> _cursorObject; + boost::optional<BSONArrayBuilder> _batch; + boost::optional<OpMsgBuilder::DocSequenceBuilder> _docSeqBuilder; + bool _active = true; - BSONObjBuilder* const _commandResponse; - BSONObjBuilder _cursorObject; - BSONArrayBuilder _batch; long long _numDocs = 0; Timestamp _latestOplogTimestamp; }; diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index eba5d6748fe..cfe3c8ac3cf 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -30,6 +30,8 @@ #include "mongo/db/query/cursor_response.h" +#include "mongo/rpc/op_msg_rpc_impls.h" + #include "mongo/unittest/unittest.h" namespace mongo { @@ -330,6 +332,32 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) { ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2)); } +TEST(CursorResponseTest, cursorReturnDocumentSequences) { + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + options.useDocumentSequences = true; + rpc::OpMsgReplyBuilder builder; + BSONObj expectedDoc = BSON("_id" << 1 << "test" + << "123"); + BSONObj expectedBody = BSON("cursor" << BSON("id" << CursorId(123) << "ns" + << "db.coll")); + + CursorResponseBuilder crb(&builder, options); + crb.append(expectedDoc); + ASSERT_EQ(crb.numDocs(), 1U); + crb.done(CursorId(123), "db.coll"); + + auto msg = builder.done(); + auto opMsg = OpMsg::parse(msg); + const auto& docSeqs = opMsg.sequences; + ASSERT_EQ(docSeqs.size(), 1U); + const auto& documentSequence = docSeqs[0]; + ASSERT_EQ(documentSequence.name, "cursor.firstBatch"); + ASSERT_EQ(documentSequence.objs.size(), 1U); + ASSERT_BSONOBJ_EQ(documentSequence.objs[0], expectedDoc); + ASSERT_BSONOBJ_EQ(opMsg.body, expectedBody); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index c7c7694daae..9a10be835e7 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -37,6 +37,7 @@ #include "mongo/db/sessions_collection_rs.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/op_msg.h" +#include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_find.h" @@ -145,14 +146,16 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( return ex.toStatus(); } - BSONObjBuilder result; - CursorResponseBuilder firstBatch(/*firstBatch*/ true, &result); + rpc::OpMsgReplyBuilder replyBuilder; + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + CursorResponseBuilder firstBatch(&replyBuilder, options); for (const auto& obj : batch) { firstBatch.append(obj); } firstBatch.done(cursorId, NamespaceString::kLogicalSessionsNamespace.ns()); - return result.obj(); + return replyBuilder.releaseBody(); }; return doFetch(NamespaceString::kLogicalSessionsNamespace, sessions, send); diff --git a/src/mongo/rpc/op_msg.h b/src/mongo/rpc/op_msg.h index dbe6a1506ef..75a5940ca72 100644 --- a/src/mongo/rpc/op_msg.h +++ b/src/mongo/rpc/op_msg.h @@ -319,6 +319,10 @@ public: return BSONObjBuilder(*_buf); } + int len() const { + return _buf->len(); + } + private: friend OpMsgBuilder; diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index dcbd8074319..1aa3045ea5f 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -33,6 +33,7 @@ #include "mongo/s/commands/cluster_aggregate.h" #include <boost/intrusive_ptr.hpp> +#include <mongo/rpc/op_msg_rpc_impls.h> #include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/authorization_session.h" @@ -543,9 +544,12 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; - BSONObjBuilder cursorResponse; - CursorResponseBuilder responseBuilder(true, &cursorResponse); + rpc::OpMsgReplyBuilder replyBuilder; + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + + CursorResponseBuilder responseBuilder(&replyBuilder, options); for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { ClusterQueryResult next; @@ -609,9 +613,11 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, responseBuilder.done(clusterCursorId, requestedNss.ns()); - CommandHelpers::appendSimpleCommandStatus(cursorResponse, true); + auto bodyBuilder = replyBuilder.getBodyBuilder(); + CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true); + bodyBuilder.doneFast(); - return cursorResponse.obj(); + return replyBuilder.releaseBody(); } /** diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index f443c297905..6fdd4a2d0f0 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -214,7 +214,7 @@ public: Status explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { + rpc::ReplyBuilderInterface* result) const override { std::string dbname = request.getDatabase().toString(); const BSONObj& cmdObj = request.body; const NamespaceString nss(parseNs(dbname, cmdObj)); @@ -282,8 +282,9 @@ public: nsStruct.requestedNss = nss; nsStruct.executionNss = resolvedAggRequest.getNamespaceString(); + auto bodyBuilder = result->getBodyBuilder(); return ClusterAggregate::runAggregate( - opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, out); + opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &bodyBuilder); } long long millisElapsed = timer.millis(); @@ -291,12 +292,13 @@ public: const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResponses.size(), cmdObj); + auto bodyBuilder = result->getBodyBuilder(); return ClusterExplain::buildExplainResult( opCtx, ClusterExplain::downconvert(opCtx, shardResponses), mongosStageName, millisElapsed, - out); + &bodyBuilder); } private: diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp index 09ab87aa48b..cffde0ec3b0 100644 --- a/src/mongo/s/commands/cluster_distinct_cmd.cpp +++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp @@ -86,7 +86,7 @@ public: Status explain(OperationContext* opCtx, const OpMsgRequest& opMsgRequest, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { + rpc::ReplyBuilderInterface* result) const override { std::string dbname = opMsgRequest.getDatabase().toString(); const BSONObj& cmdObj = opMsgRequest.body; const NamespaceString nss(parseNs(dbname, cmdObj)); @@ -138,8 +138,9 @@ public: nsStruct.requestedNss = nss; nsStruct.executionNss = resolvedAggRequest.getNamespaceString(); + auto bodyBuilder = result->getBodyBuilder(); return ClusterAggregate::runAggregate( - opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, out); + opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &bodyBuilder); } long long millisElapsed = timer.millis(); @@ -147,12 +148,13 @@ public: const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResponses.size(), cmdObj); + auto bodyBuilder = result->getBodyBuilder(); return ClusterExplain::buildExplainResult( opCtx, ClusterExplain::downconvert(opCtx, shardResponses), mongosStageName, millisElapsed, - out); + &bodyBuilder); } bool run(OperationContext* opCtx, diff --git a/src/mongo/s/commands/cluster_explain_cmd.cpp b/src/mongo/s/commands/cluster_explain_cmd.cpp index a04c138ddba..0b7f4c1f286 100644 --- a/src/mongo/s/commands/cluster_explain_cmd.cpp +++ b/src/mongo/s/commands/cluster_explain_cmd.cpp @@ -96,13 +96,12 @@ public: private: void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { - auto bob = result->getBodyBuilder(); - _innerInvocation->explain(opCtx, _verbosity, &bob); + _innerInvocation->explain(opCtx, _verbosity, result); } void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override { + rpc::ReplyBuilderInterface* result) override { uasserted(ErrorCodes::IllegalOperation, "Explain cannot explain itself."); } diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 8393517bac7..75347473ce3 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -105,7 +105,7 @@ public: Status explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { + rpc::ReplyBuilderInterface* result) const override { std::string dbName = request.getDatabase().toString(); const BSONObj& cmdObj = request.body; const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj)); @@ -134,25 +134,26 @@ public: // Time how long it takes to run the explain command on the shard. Timer timer; - BSONObjBuilder result; + BSONObjBuilder bob; _runCommand(opCtx, shard->getId(), (chunkMgr ? chunkMgr->getVersion(shard->getId()) : ChunkVersion::UNSHARDED()), nss, explainCmd, - &result); + &bob); const auto millisElapsed = timer.millis(); Strategy::CommandResult cmdResult; cmdResult.shardTargetId = shard->getId(); cmdResult.target = shard->getConnString(); - cmdResult.result = result.obj(); + cmdResult.result = bob.obj(); std::vector<Strategy::CommandResult> shardResults; shardResults.push_back(cmdResult); + auto bodyBuilder = result->getBodyBuilder(); return ClusterExplain::buildExplainResult( - opCtx, shardResults, ClusterExplain::kSingleShard, millisElapsed, out); + opCtx, shardResults, ClusterExplain::kSingleShard, millisElapsed, &bodyBuilder); } bool run(OperationContext* opCtx, diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index f879e3f1b96..a69d1e4a44f 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -119,7 +119,7 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override { + rpc::ReplyBuilderInterface* result) override { // Parse the command BSON to a QueryRequest. bool isExplain = true; auto qr = @@ -150,15 +150,17 @@ public: const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResponses.size(), _request.body); + auto bodyBuilder = result->getBodyBuilder(); uassertStatusOK(ClusterExplain::buildExplainResult( opCtx, ClusterExplain::downconvert(opCtx, shardResponses), mongosStageName, millisElapsed, - result)); + &bodyBuilder)); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { - result->resetToEmpty(); + auto bodyBuilder = result->getBodyBuilder(); + bodyBuilder.resetToEmpty(); auto aggCmdOnView = uassertStatusOK(qr->asAggregationCommand()); @@ -173,7 +175,7 @@ public: nsStruct.executionNss = std::move(ex->getNamespace()); uassertStatusOK(ClusterAggregate::runAggregate( - opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, result)); + opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &bodyBuilder)); } } @@ -199,9 +201,11 @@ public: std::vector<BSONObj> batch; auto cursorId = ClusterFind::runQuery(opCtx, *cq, ReadPreferenceSetting::get(opCtx), &batch); - auto bodyBuilder = result->getBodyBuilder(); + // Build the response document. - CursorResponseBuilder firstBatch(/*firstBatch*/ true, &bodyBuilder); + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + CursorResponseBuilder firstBatch(result, options); for (const auto& obj : batch) { firstBatch.append(obj); } diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 897d4b4091b..ead254ff71f 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -85,8 +85,9 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) override { - _runAggCommand(opCtx, _dbName, _request.body, verbosity, out); + rpc::ReplyBuilderInterface* result) override { + auto bodyBuilder = result->getBodyBuilder(); + _runAggCommand(opCtx, _dbName, _request.body, verbosity, &bodyBuilder); } void doCheckAuthorization(OperationContext* opCtx) const override { diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index b4bbc66d893..f4f3f789038 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -311,7 +311,7 @@ private: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override { + rpc::ReplyBuilderInterface* result) override { uassert(ErrorCodes::InvalidLength, "explained write batches must be of size 1", _batchedRequest.sizeWriteOps() == 1U); @@ -329,8 +329,9 @@ private: explainCmd, targetingBatchItem, &shardResults)); + auto bodyBuilder = result->getBodyBuilder(); uassertStatusOK(ClusterExplain::buildExplainResult( - opCtx, shardResults, ClusterExplain::kWriteOnShards, timer.millis(), result)); + opCtx, shardResults, ClusterExplain::kWriteOnShards, timer.millis(), &bodyBuilder)); } NamespaceString ns() const override { diff --git a/src/mongo/s/query/cluster_find_test.cpp b/src/mongo/s/query/cluster_find_test.cpp index ccba55977a6..fb89bfa5e93 100644 --- a/src/mongo/s/query/cluster_find_test.cpp +++ b/src/mongo/s/query/cluster_find_test.cpp @@ -32,6 +32,7 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/query_request.h" +#include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/s/catalog_cache_test_fixture.h" #include "mongo/s/query/cluster_find.h" #include "mongo/unittest/unittest.h" @@ -122,14 +123,16 @@ protected: auto cursorId = ClusterFind::runQuery( operationContext(), *cq, ReadPreferenceSetting(ReadPreference::PrimaryOnly), &batch); - BSONObjBuilder result; - CursorResponseBuilder firstBatch(/* firstBatch */ true, &result); + rpc::OpMsgReplyBuilder result; + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + CursorResponseBuilder firstBatch(&result, options); for (const auto& obj : batch) { firstBatch.append(obj); } firstBatch.done(cursorId, nss.ns()); - return result.obj(); + return result.releaseBody(); } void runFindCommandSuccessful(BSONObj cmd, bool isTargeted) { |