diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/commands.h | 4 | ||||
-rw-r--r-- | src/mongo/db/commands/count_cmd.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/commands/current_op.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/commands/distinct.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/commands/explain_cmd.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/commands/find_and_modify.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.h | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/user_management_commands.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/write_commands.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/commands_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/query/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.h | 39 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response_test.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_sharded.cpp | 9 |
20 files changed, 159 insertions, 74 deletions
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index f47cc77cf9c..57e82e49067 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -440,7 +440,7 @@ private: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override { + rpc::ReplyBuilderInterface* result) override { uassertStatusOK(_command->explain(opCtx, *_request, verbosity, result)); } @@ -492,7 +492,7 @@ Command::Command(StringData name, StringData oldName) Status BasicCommand::explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const { + rpc::ReplyBuilderInterface* result) const { return {ErrorCodes::IllegalOperation, str::stream() << "Cannot explain cmd: " << getName()}; } diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 53788b16d3d..3dfe96e5cd8 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -441,7 +441,7 @@ public: virtual void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) { + rpc::ReplyBuilderInterface* result) { uasserted(ErrorCodes::IllegalOperation, str::stream() << "Cannot explain cmd: " << definition()->getName()); } @@ -561,7 +561,7 @@ public: virtual Status explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const; + rpc::ReplyBuilderInterface* result) const; /** * Checks if the client associated with the given OperationContext is authorized to run this diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index 8818ded8b59..ecff6c12986 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -107,7 +107,7 @@ public: Status explain(OperationContext* opCtx, const OpMsgRequest& opMsgRequest, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { + rpc::ReplyBuilderInterface* result) const override { std::string dbname = opMsgRequest.getDatabase().toString(); const BSONObj& cmdObj = opMsgRequest.body; // Acquire locks and resolve possible UUID. The RAII object is optional, because in the case @@ -143,7 +143,7 @@ public: viewAggRequest.getValue().getNamespaceString(), viewAggRequest.getValue(), viewAggregation.getValue(), - *out); + result); } Collection* const collection = ctx->getCollection(); @@ -160,7 +160,8 @@ public: auto exec = std::move(statusWithPlanExecutor.getValue()); - Explain::explainStages(exec.get(), collection, verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder); return Status::OK(); } diff --git a/src/mongo/db/commands/current_op.cpp b/src/mongo/db/commands/current_op.cpp index 47e22ea8296..3b0d2d1814f 100644 --- a/src/mongo/db/commands/current_op.cpp +++ b/src/mongo/db/commands/current_op.cpp @@ -66,18 +66,20 @@ public: OperationContext* opCtx, const AggregationRequest& request) const final { auto aggCmdObj = request.serializeToCommandObj().toBson(); - BSONObjBuilder responseBuilder; + rpc::OpMsgReplyBuilder replyBuilder; auto status = runAggregate( - opCtx, request.getNamespaceString(), request, std::move(aggCmdObj), responseBuilder); + opCtx, request.getNamespaceString(), request, std::move(aggCmdObj), &replyBuilder); if (!status.isOK()) { return status; } - CommandHelpers::appendSimpleCommandStatus(responseBuilder, true); + auto bodyBuilder = replyBuilder.getBodyBuilder(); + CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true); + bodyBuilder.doneFast(); - return CursorResponse::parseFromBSON(responseBuilder.obj()); + return CursorResponse::parseFromBSON(replyBuilder.releaseBody()); } virtual void appendToResponse(BSONObjBuilder* result) const final { diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index 8802e4875cb..8b6098f4341 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -112,7 +112,7 @@ public: Status explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { + rpc::ReplyBuilderInterface* result) const override { std::string dbname = request.getDatabase().toString(); const BSONObj& cmdObj = request.body; // Acquire locks and resolve possible UUID. The RAII object is optional, because in the case @@ -143,7 +143,7 @@ public: } return runAggregate( - opCtx, nss, viewAggRequest.getValue(), viewAggregation.getValue(), *out); + opCtx, nss, viewAggRequest.getValue(), viewAggregation.getValue(), result); } Collection* const collection = ctx->getCollection(); @@ -151,7 +151,8 @@ public: auto executor = uassertStatusOK(getExecutorDistinct(opCtx, collection, nss.ns(), &parsedDistinct)); - Explain::explainStages(executor.get(), collection, verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(executor.get(), collection, verbosity, &bodyBuilder); return Status::OK(); } diff --git a/src/mongo/db/commands/explain_cmd.cpp b/src/mongo/db/commands/explain_cmd.cpp index b20a1a7d850..c41022b5550 100644 --- a/src/mongo/db/commands/explain_cmd.cpp +++ b/src/mongo/db/commands/explain_cmd.cpp @@ -97,13 +97,12 @@ public: "Explain's child command cannot run on this node. " "Are you explaining a write command on a secondary?", commandCanRunHere(opCtx, _dbName, _innerInvocation->definition())); - BSONObjBuilder bob = result->getBodyBuilder(); - _innerInvocation->explain(opCtx, _verbosity, &bob); + _innerInvocation->explain(opCtx, _verbosity, result); } void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override { + rpc::ReplyBuilderInterface* result) override { uasserted(ErrorCodes::IllegalOperation, "Explain cannot explain itself."); } diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 4c70636f928..070557d4f41 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -256,7 +256,7 @@ public: Status explain(OperationContext* opCtx, const OpMsgRequest& request, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { + rpc::ReplyBuilderInterface* result) const override { std::string dbName = request.getDatabase().toString(); const BSONObj& cmdObj = request.body; const auto args(uassertStatusOK(FindAndModifyRequest::parseFromBSON( @@ -288,7 +288,8 @@ public: const auto exec = uassertStatusOK(getExecutorDelete(opCtx, opDebug, collection, &parsedDelete)); - Explain::explainStages(exec.get(), collection, verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder); } else { UpdateRequest request(nsString); UpdateLifecycleImpl updateLifecycle(nsString); @@ -312,7 +313,8 @@ public: const auto exec = uassertStatusOK(getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate)); - Explain::explainStages(exec.get(), collection, verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder); } return Status::OK(); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index e16f2473092..7c82c31da29 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -142,7 +142,7 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override { + rpc::ReplyBuilderInterface* result) override { // Acquire locks and resolve possible UUID. The RAII object is optional, because in the // case of a view, the locks need to be released. boost::optional<AutoGetCollectionForReadCommand> ctx; @@ -182,7 +182,7 @@ public: try { uassertStatusOK( - runAggregate(opCtx, nss, aggRequest, viewAggregationCommand, *result)); + runAggregate(opCtx, nss, aggRequest, viewAggregationCommand, result)); } catch (DBException& error) { if (error.code() == ErrorCodes::InvalidPipelineOperator) { uasserted(ErrorCodes::InvalidPipelineOperator, @@ -201,8 +201,9 @@ public: // We have a parsed query. Time to get the execution plan for it. auto exec = uassertStatusOK(getExecutorFind(opCtx, collection, nss, std::move(cq))); + auto bodyBuilder = result->getBodyBuilder(); // Got the execution tree. Explain it. - Explain::explainStages(exec.get(), collection, verbosity, result); + Explain::explainStages(exec.get(), collection, verbosity, &bodyBuilder); } /** @@ -320,8 +321,9 @@ public: const QueryRequest& originalQR = exec->getCanonicalQuery()->getQueryRequest(); // Stream query results, adding them to a BSONArray as we go. - auto bodyBuilder = result->getBodyBuilder(); - CursorResponseBuilder firstBatch(/*isInitialResponse*/ true, &bodyBuilder); + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + CursorResponseBuilder firstBatch(result, options); BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; long long numResults = 0; diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index e296e4e0c97..a6fa17ab2d6 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -244,7 +244,6 @@ public: void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { // Counted as a getMore, not as a command. globalOpCounters.gotGetMore(); - auto result = reply->getBodyBuilder(); auto curOp = CurOp::get(opCtx); curOp->debug().cursorid = _request.cursorid; @@ -428,7 +427,8 @@ public: } CursorId respondWithId = 0; - CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result); + + CursorResponseBuilder nextBatch(reply, CursorResponseBuilder::Options()); BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; long long numResults = 0; diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index a7096773104..54ecc3df1ab 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -79,7 +79,6 @@ public: } void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { - auto bob = reply->getBodyBuilder(); const auto aggregationRequest = uassertStatusOK( AggregationRequest::parseFromBSON(_dbName, _request.body, boost::none)); @@ -87,7 +86,7 @@ public: aggregationRequest.getNamespaceString(), aggregationRequest, _request.body, - bob)); + reply)); } NamespaceString ns() const override { @@ -96,7 +95,7 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) override { + rpc::ReplyBuilderInterface* result) override { const auto aggregationRequest = uassertStatusOK( AggregationRequest::parseFromBSON(_dbName, _request.body, verbosity)); @@ -104,7 +103,7 @@ public: aggregationRequest.getNamespaceString(), aggregationRequest, _request.body, - *out)); + result)); } void doCheckAuthorization(OperationContext* opCtx) const override { diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 5e00d3f867c..60efa126c3c 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -91,7 +91,7 @@ bool handleCursorCommand(OperationContext* opCtx, const NamespaceString& nsForCursor, std::vector<ClientCursor*> cursors, const AggregationRequest& request, - BSONObjBuilder& result) { + rpc::ReplyBuilderInterface* result) { invariant(!cursors.empty()); long long batchSize = request.getBatchSize(); @@ -121,12 +121,15 @@ bool handleCursorCommand(OperationContext* opCtx, cursors[idx]->getExecutor()->detachFromOperationContext(); } - result.appendArray("cursors", cursorsBuilder.obj()); + auto bodyBuilder = result->getBodyBuilder(); + bodyBuilder.appendArray("cursors", cursorsBuilder.obj()); return true; } - CursorResponseBuilder responseBuilder(true, &result); + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + CursorResponseBuilder responseBuilder(result, options); ClientCursor* cursor = cursors[0]; invariant(cursor); @@ -345,7 +348,7 @@ Status runAggregate(OperationContext* opCtx, const NamespaceString& origNss, const AggregationRequest& request, const BSONObj& cmdObj, - BSONObjBuilder& result) { + rpc::ReplyBuilderInterface* result) { // For operations on views, this will be the underlying namespace. NamespaceString nss = request.getNamespaceString(); @@ -611,8 +614,9 @@ Status runAggregate(OperationContext* opCtx, // If both explain and cursor are specified, explain wins. if (expCtx->explain) { + auto bodyBuilder = result->getBodyBuilder(); Explain::explainPipelineExecutor( - pins[0].getCursor()->getExecutor(), *(expCtx->explain), &result); + pins[0].getCursor()->getExecutor(), *(expCtx->explain), &bodyBuilder); } else { // Cursor must be specified, if explain is not. const bool keepCursor = diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h index 1412768ddcf..75212efc22c 100644 --- a/src/mongo/db/commands/run_aggregate.h +++ b/src/mongo/db/commands/run_aggregate.h @@ -33,6 +33,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/rpc/op_msg_rpc_impls.h" namespace mongo { @@ -48,6 +49,6 @@ Status runAggregate(OperationContext* opCtx, const NamespaceString& nss, const AggregationRequest& request, const BSONObj& cmdObj, - BSONObjBuilder& result); + rpc::ReplyBuilderInterface* result); } // namespace mongo diff --git a/src/mongo/db/commands/user_management_commands.cpp b/src/mongo/db/commands/user_management_commands.cpp index 145cd247301..82823e3cfcc 100644 --- a/src/mongo/db/commands/user_management_commands.cpp +++ b/src/mongo/db/commands/user_management_commands.cpp @@ -1371,16 +1371,18 @@ public: DBDirectClient client(opCtx); - BSONObjBuilder responseBuilder; + rpc::OpMsgReplyBuilder replyBuilder; AggregationRequest aggRequest(AuthorizationManager::usersCollectionNamespace, std::move(pipeline)); uassertStatusOK(runAggregate(opCtx, AuthorizationManager::usersCollectionNamespace, aggRequest, aggRequest.serializeToCommandObj().toBson(), - responseBuilder)); - CommandHelpers::appendSimpleCommandStatus(responseBuilder, true); - auto response = CursorResponse::parseFromBSONThrowing(responseBuilder.obj()); + &replyBuilder)); + auto bodyBuilder = replyBuilder.getBodyBuilder(); + CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true); + bodyBuilder.doneFast(); + auto response = CursorResponse::parseFromBSONThrowing(replyBuilder.releaseBody()); DBClientCursor cursor(&client, response.getNSS().toString(), response.getCursorId(), diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index 49e90ff0899..35d2e0ec53b 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -350,7 +350,7 @@ private: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) override { + rpc::ReplyBuilderInterface* result) override { uassert(ErrorCodes::InvalidLength, "explained write batches must be of size 1", _batch.getUpdates().size() == 1); @@ -376,7 +376,8 @@ private: auto exec = uassertStatusOK(getExecutorUpdate( opCtx, &CurOp::get(opCtx)->debug(), collection.getCollection(), &parsedUpdate)); - Explain::explainStages(exec.get(), collection.getCollection(), verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection.getCollection(), verbosity, &bodyBuilder); } write_ops::Update _batch; @@ -426,7 +427,7 @@ private: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) override { + rpc::ReplyBuilderInterface* result) override { uassert(ErrorCodes::InvalidLength, "explained write batches must be of size 1", _batch.getDeletes().size() == 1); @@ -448,7 +449,8 @@ private: // Explain the plan tree. auto exec = uassertStatusOK(getExecutorDelete( opCtx, &CurOp::get(opCtx)->debug(), collection.getCollection(), &parsedDelete)); - Explain::explainStages(exec.get(), collection.getCollection(), verbosity, out); + auto bodyBuilder = result->getBodyBuilder(); + Explain::explainStages(exec.get(), collection.getCollection(), verbosity, &bodyBuilder); } write_ops::Delete _batch; diff --git a/src/mongo/db/commands_test.cpp b/src/mongo/db/commands_test.cpp index c92f72617c3..1b25d644765 100644 --- a/src/mongo/db/commands_test.cpp +++ b/src/mongo/db/commands_test.cpp @@ -218,7 +218,7 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override {} + rpc::ReplyBuilderInterface* result) override {} void doCheckAuthorization(OperationContext*) const override {} @@ -263,7 +263,7 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, - BSONObjBuilder* result) override {} + rpc::ReplyBuilderInterface* result) override {} void doCheckAuthorization(OperationContext*) const override {} diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 7247f723a02..1605b395c7d 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -147,6 +147,7 @@ env.Library( '$BUILD_DIR/mongo/db/namespace_string', '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/rpc/command_status', + '$BUILD_DIR/mongo/rpc/rpc', 'query_request', ] ) @@ -164,6 +165,7 @@ env.CppUnitTest( ], LIBDEPS=[ "$BUILD_DIR/mongo/db/pipeline/aggregation_request", + '$BUILD_DIR/mongo/rpc/rpc', 'command_request_response', ] ) diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index e449aad0814..14b8af5e8f6 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -44,34 +44,52 @@ const char kIdField[] = "id"; const char kNsField[] = "ns"; const char kBatchField[] = "nextBatch"; const char kBatchFieldInitial[] = "firstBatch"; +const char kBatchDocSequenceField[] = "cursor.nextBatch"; +const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch"; const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp"; } // namespace -CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse, - BSONObjBuilder* commandResponse) - : _responseInitialLen(commandResponse->bb().len()), - _commandResponse(commandResponse), - _cursorObject(commandResponse->subobjStart(kCursorField)), - _batch(_cursorObject.subarrayStart(isInitialResponse ? kBatchFieldInitial : kBatchField)) {} +CursorResponseBuilder::CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder, + Options options = Options()) + : _options(options), _replyBuilder(replyBuilder) { + if (_options.useDocumentSequences) { + _docSeqBuilder.emplace(_replyBuilder->getDocSequenceBuilder( + _options.isInitialResponse ? kBatchDocSequenceFieldInitial : kBatchDocSequenceField)); + } else { + _bodyBuilder.emplace(_replyBuilder->getBodyBuilder()); + _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField)); + _batch.emplace(_cursorObject->subarrayStart(_options.isInitialResponse ? kBatchFieldInitial + : kBatchField)); + } +} void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) { invariant(_active); - _batch.doneFast(); - _cursorObject.append(kIdField, cursorId); - _cursorObject.append(kNsField, cursorNamespace); - _cursorObject.doneFast(); + if (_options.useDocumentSequences) { + _docSeqBuilder.reset(); + _bodyBuilder.emplace(_replyBuilder->getBodyBuilder()); + _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField)); + } else { + _batch.reset(); + } + _cursorObject->append(kIdField, cursorId); + _cursorObject->append(kNsField, cursorNamespace); + _cursorObject.reset(); + if (!_latestOplogTimestamp.isNull()) { - _commandResponse->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp); + _bodyBuilder->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp); } + _bodyBuilder.reset(); _active = false; } void CursorResponseBuilder::abandon() { invariant(_active); - _batch.doneFast(); - _cursorObject.doneFast(); - _commandResponse->bb().setlen(_responseInitialLen); // Removes everything we've added. + _batch.reset(); + _cursorObject.reset(); + _bodyBuilder.reset(); + _replyBuilder->reset(); _numDocs = 0; _active = false; } diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index 091b64e0c73..529654118df 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -35,6 +35,8 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/clientcursor.h" #include "mongo/db/namespace_string.h" +#include "mongo/rpc/op_msg.h" +#include "mongo/rpc/reply_builder_interface.h" namespace mongo { @@ -47,14 +49,23 @@ class CursorResponseBuilder { public: /** - * Once constructed, you may not use the passed-in BSONObjBuilder until you call either done() + * Structure used to confiugre the CursorResponseBuilder. + */ + struct Options { + bool isInitialResponse = false; + bool useDocumentSequences = false; + }; + + /** + * Once constructed, you may not use the passed-in ReplyBuilderInterface until you call either + * done() * or abandon(), or this object goes out of scope. This is the same as the rule when using a * BSONObjBuilder to build a sub-object with subobjStart(). * - * If the builder goes out of scope without a call to done(), any data appended to the - * builder will be removed. + * If the builder goes out of scope without a call to done(), the ReplyBuilderInterface will be + * reset. */ - CursorResponseBuilder(bool isInitialResponse, BSONObjBuilder* commandResponse); + CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder, Options options); ~CursorResponseBuilder() { if (_active) @@ -63,12 +74,16 @@ public: size_t bytesUsed() const { invariant(_active); - return _batch.len(); + return _options.useDocumentSequences ? _docSeqBuilder->len() : _batch->len(); } void append(const BSONObj& obj) { invariant(_active); - _batch.append(obj); + if (_options.useDocumentSequences) { + _docSeqBuilder->append(obj); + } else { + _batch->append(obj); + } _numDocs++; } @@ -94,11 +109,15 @@ public: void abandon(); private: - const int _responseInitialLen; // Must be the first member so its initializer runs first. + const Options _options; + rpc::ReplyBuilderInterface* const _replyBuilder; + // Order here is important to ensure destruction in the correct order. + boost::optional<BSONObjBuilder> _bodyBuilder; + boost::optional<BSONObjBuilder> _cursorObject; + boost::optional<BSONArrayBuilder> _batch; + boost::optional<OpMsgBuilder::DocSequenceBuilder> _docSeqBuilder; + bool _active = true; - BSONObjBuilder* const _commandResponse; - BSONObjBuilder _cursorObject; - BSONArrayBuilder _batch; long long _numDocs = 0; Timestamp _latestOplogTimestamp; }; diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index eba5d6748fe..cfe3c8ac3cf 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -30,6 +30,8 @@ #include "mongo/db/query/cursor_response.h" +#include "mongo/rpc/op_msg_rpc_impls.h" + #include "mongo/unittest/unittest.h" namespace mongo { @@ -330,6 +332,32 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) { ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2)); } +TEST(CursorResponseTest, cursorReturnDocumentSequences) { + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + options.useDocumentSequences = true; + rpc::OpMsgReplyBuilder builder; + BSONObj expectedDoc = BSON("_id" << 1 << "test" + << "123"); + BSONObj expectedBody = BSON("cursor" << BSON("id" << CursorId(123) << "ns" + << "db.coll")); + + CursorResponseBuilder crb(&builder, options); + crb.append(expectedDoc); + ASSERT_EQ(crb.numDocs(), 1U); + crb.done(CursorId(123), "db.coll"); + + auto msg = builder.done(); + auto opMsg = OpMsg::parse(msg); + const auto& docSeqs = opMsg.sequences; + ASSERT_EQ(docSeqs.size(), 1U); + const auto& documentSequence = docSeqs[0]; + ASSERT_EQ(documentSequence.name, "cursor.firstBatch"); + ASSERT_EQ(documentSequence.objs.size(), 1U); + ASSERT_BSONOBJ_EQ(documentSequence.objs[0], expectedDoc); + ASSERT_BSONOBJ_EQ(opMsg.body, expectedBody); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index c7c7694daae..9a10be835e7 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -37,6 +37,7 @@ #include "mongo/db/sessions_collection_rs.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/op_msg.h" +#include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_find.h" @@ -145,14 +146,16 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( return ex.toStatus(); } - BSONObjBuilder result; - CursorResponseBuilder firstBatch(/*firstBatch*/ true, &result); + rpc::OpMsgReplyBuilder replyBuilder; + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + CursorResponseBuilder firstBatch(&replyBuilder, options); for (const auto& obj : batch) { firstBatch.append(obj); } firstBatch.done(cursorId, NamespaceString::kLogicalSessionsNamespace.ns()); - return result.obj(); + return replyBuilder.releaseBody(); }; return doFetch(NamespaceString::kLogicalSessionsNamespace, sessions, send); |