diff options
author | Anthony Roy <anthony.roy@10gen.com> | 2018-06-19 17:53:48 -0400 |
---|---|---|
committer | Anthony Roy <anthony.roy@10gen.com> | 2018-07-05 17:43:25 -0400 |
commit | f78056a8f1f5ea6af23bd68123659b714233b370 (patch) | |
tree | ca9bf843fec69e4ba687bdee55dade490b5ac246 | |
parent | 173ac70346990e4cf9b2720f86697785b8795967 (diff) | |
download | mongo-f78056a8f1f5ea6af23bd68123659b714233b370.tar.gz |
SERVER-35460 Replaced CommandReplyBuilder with direct usage of ReplyBuilderInterface
This is to provide access to DocumentSequence returns.
23 files changed, 216 insertions, 149 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 527e2ce7af6..67d31359f66 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -594,6 +594,7 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/idl/idl_parser', + '$BUILD_DIR/mongo/rpc/rpc', "commands", "auth/authmocks", "repl/replmocks", diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index 87f02341918..b020d29eee0 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -50,6 +50,9 @@ #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/server_parameters.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/op_msg_rpc_impls.h" +#include "mongo/rpc/protocol.h" #include "mongo/rpc/write_concern_error_detail.h" #include "mongo/s/stale_exception.h" #include "mongo/util/invariant.h" @@ -109,22 +112,21 @@ bool checkAuthorizationImplPreParse(OperationContext* opCtx, BSONObj CommandHelpers::runCommandDirectly(OperationContext* opCtx, const OpMsgRequest& request) { auto command = globalCommandRegistry()->findCommand(request.getCommandName()); invariant(command); - BufBuilder bb; - CommandReplyBuilder crb(BSONObjBuilder{bb}); + rpc::OpMsgReplyBuilder replyBuilder; try { auto invocation = command->parse(opCtx, request); - invocation->run(opCtx, &crb); - auto body = crb.getBodyBuilder(); + invocation->run(opCtx, &replyBuilder); + auto body = replyBuilder.getBodyBuilder(); CommandHelpers::extractOrAppendOk(body); } catch (const StaleConfigException&) { // These exceptions are intended to be handled at a higher level. throw; } catch (const DBException& ex) { - auto body = crb.getBodyBuilder(); + auto body = replyBuilder.getBodyBuilder(); body.resetToEmpty(); appendCommandStatusNoThrow(body, ex.toStatus()); } - return BSONObj(bb.release()); + return replyBuilder.releaseBody(); } void CommandHelpers::auditLogAuthEvent(OperationContext* opCtx, @@ -380,24 +382,6 @@ bool CommandHelpers::uassertShouldAttemptParse(OperationContext* opCtx, constexpr StringData CommandHelpers::kHelpFieldName; ////////////////////////////////////////////////////////////// -// CommandReplyBuilder - -CommandReplyBuilder::CommandReplyBuilder(BSONObjBuilder bodyObj) - : _bodyBuf(&bodyObj.bb()), _bodyOffset(bodyObj.offset()) { - // CommandReplyBuilder requires that bodyObj build into an externally-owned buffer. - invariant(!bodyObj.owned()); - bodyObj.doneFast(); -} - -BSONObjBuilder CommandReplyBuilder::getBodyBuilder() const { - return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, *_bodyBuf, _bodyOffset); -} - -void CommandReplyBuilder::reset() { - getBodyBuilder().resetToEmpty(); -} - -////////////////////////////////////////////////////////////// // CommandInvocation CommandInvocation::~CommandInvocation() = default; @@ -443,7 +427,7 @@ public: _dbName(_request->getDatabase().toString()) {} private: - void run(OperationContext* opCtx, CommandReplyBuilder* result) override { + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { try { BSONObjBuilder bob = result->getBodyBuilder(); bool ok = _command->run(opCtx, _dbName, _request->body, bob); diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 966a0d640c7..5907fb65990 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -414,52 +414,6 @@ private: ServerStatusMetricField<Counter64> _commandsFailedMetric; }; -class CommandReplyBuilder { -public: - explicit CommandReplyBuilder(BSONObjBuilder bodyObj); - - CommandReplyBuilder(const CommandReplyBuilder&) = delete; - CommandReplyBuilder& operator=(const CommandReplyBuilder&) = delete; - - /** - * Returns a BSONObjBuilder that can be used to build the reply in-place. The returned - * builder (or an object into which it has been moved) must be completed before calling - * any more methods on this object. A builder is completed by a call to `done()` or by - * its destructor. Can be called repeatedly to append multiple things to the reply, as - * long as each returned builder must be completed between calls. - */ - BSONObjBuilder getBodyBuilder() const; - - void reset(); - - /** - * Appends a key:object field to this reply. - */ - template <typename T> - void append(StringData key, const T& object) { - getBodyBuilder() << key << object; - } - - /** - * The specified 'object' must be BSON-serializable. - * - * BSONSerializable 'x' means 'x.serialize(bob)' appends a representation of 'x' - * into 'BSONObjBuilder* bob'. - */ - template <typename T> - void fillFrom(const T& object) { - static_assert(!isStatusOrStatusWith<std::decay_t<T>>, - "Status and StatusWith<T> aren't supported by TypedCommand and fillFrom(). " - "Use uassertStatusOK() instead."); - auto bob = getBodyBuilder(); - object.serialize(&bob); - } - -private: - BufBuilder* const _bodyBuf; - const std::size_t _bodyOffset; -}; - /** * Represents a single invocation of a given command. */ @@ -476,7 +430,7 @@ public: * indicated either by throwing (preferred), or by calling * `CommandHelpers::extractOrAppendOk`. */ - virtual void run(OperationContext* opCtx, CommandReplyBuilder* result) = 0; + virtual void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) = 0; virtual void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, @@ -808,14 +762,14 @@ private: decltype(auto) _callTypedRun(OperationContext* opCtx) { return static_cast<Invocation*>(this)->typedRun(opCtx); } - void _runImpl(std::true_type, OperationContext* opCtx, CommandReplyBuilder*) { + void _runImpl(std::true_type, OperationContext* opCtx, rpc::ReplyBuilderInterface*) { _callTypedRun(opCtx); } - void _runImpl(std::false_type, OperationContext* opCtx, CommandReplyBuilder* reply) { + void _runImpl(std::false_type, OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) { reply->fillFrom(_callTypedRun(opCtx)); } - void run(OperationContext* opCtx, CommandReplyBuilder* reply) final { + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) final { using VoidResultTag = std::is_void<decltype(_callTypedRun(opCtx))>; _runImpl(VoidResultTag{}, opCtx, reply); } diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 7f16897f022..908214a6e2f 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -109,6 +109,7 @@ env.Library( '$BUILD_DIR/mongo/db/mongohasher', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/logger/parse_log_component_settings', + '$BUILD_DIR/mongo/rpc/protocol', ], ) diff --git a/src/mongo/db/commands/explain_cmd.cpp b/src/mongo/db/commands/explain_cmd.cpp index 003047168db..46b0c6eac3f 100644 --- a/src/mongo/db/commands/explain_cmd.cpp +++ b/src/mongo/db/commands/explain_cmd.cpp @@ -92,7 +92,7 @@ public: _innerRequest{std::move(innerRequest)}, _innerInvocation{std::move(innerInvocation)} {} - void run(OperationContext* opCtx, CommandReplyBuilder* result) override { + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { uassert(50746, "Explain's child command cannot run on this node. " "Are you explaining a write command on a secondary?", diff --git a/src/mongo/db/commands/generic.cpp b/src/mongo/db/commands/generic.cpp index 277b22aba99..a572b94747c 100644 --- a/src/mongo/db/commands/generic.cpp +++ b/src/mongo/db/commands/generic.cpp @@ -108,8 +108,16 @@ public: return NamespaceString(request().request.getDatabase()); } - void run(OperationContext* opCtx, CommandReplyBuilder* result) override { - result->append("echo", request().request.body); + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { + auto sequences = request().request.sequences; + for (auto& docSeq : sequences) { + auto docBuilder = result->getDocSequenceBuilder(docSeq.name); + for (auto& bson : docSeq.objs) { + docBuilder.append(bson); + } + } + + result->getBodyBuilder().append("echo", request().request.body); } }; diff --git a/src/mongo/db/commands/mr_test.cpp b/src/mongo/db/commands/mr_test.cpp index f8c81c473a5..6a1f5d67302 100644 --- a/src/mongo/db/commands/mr_test.cpp +++ b/src/mongo/db/commands/mr_test.cpp @@ -47,6 +47,7 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/scripting/dbdirectclient_factory.h" #include "mongo/scripting/engine.h" @@ -425,10 +426,9 @@ Status MapReduceCommandTest::_runCommand(StringData mapCode, StringData reduceCo ASSERT(command) << "Unable to look up mapReduce command"; auto request = OpMsgRequest::fromDBAndBody(inputNss.db(), _makeCmdObj(mapCode, reduceCode)); - BufBuilder bb; - CommandReplyBuilder crb(BSONObjBuilder{bb}); - command->parse(_opCtx.get(), request)->run(_opCtx.get(), &crb); - auto status = getStatusFromCommandResult(crb.getBodyBuilder().asTempObj()); + auto replyBuilder = rpc::makeReplyBuilder(rpc::Protocol::kOpMsg); + command->parse(_opCtx.get(), request)->run(_opCtx.get(), replyBuilder.get()); + auto status = getStatusFromCommandResult(replyBuilder->getBodyBuilder().asTempObj()); if (!status.isOK()) { return status.withContext(str::stream() << "mapReduce command failed: " << request.body); } diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index edbd9a6d315..46ebb7d0e9c 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -225,7 +225,7 @@ private: // Customization point for 'run'. virtual void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const = 0; - void run(OperationContext* opCtx, CommandReplyBuilder* result) final { + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) final { try { try { _transactionChecks(opCtx); diff --git a/src/mongo/db/commands_test.cpp b/src/mongo/db/commands_test.cpp index bf03dd61a19..c92f72617c3 100644 --- a/src/mongo/db/commands_test.cpp +++ b/src/mongo/db/commands_test.cpp @@ -33,6 +33,8 @@ #include "mongo/db/commands_test_example_gen.h" #include "mongo/db/dbmessage.h" #include "mongo/db/service_context_test_fixture.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -203,7 +205,7 @@ public: /** * Reply with an incremented 'request.i'. */ - void run(OperationContext* opCtx, CommandReplyBuilder* reply) override { + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { commands_test_example::ExampleIncrementReply r; r.setIPlusOne(request().getI() + 1); reply->fillFrom(r); @@ -345,17 +347,16 @@ protected: ASSERT_EQ(invocation->ns(), ns); const BSONObj reply = [&] { - BufBuilder bb; - CommandReplyBuilder crb{BSONObjBuilder{bb}}; + rpc::OpMsgReplyBuilder replyBuilder; try { - invocation->run(opCtx.get(), &crb); - auto bob = crb.getBodyBuilder(); + invocation->run(opCtx.get(), &replyBuilder); + auto bob = replyBuilder.getBodyBuilder(); CommandHelpers::extractOrAppendOk(bob); } catch (const DBException& e) { - auto bob = crb.getBodyBuilder(); + auto bob = replyBuilder.getBodyBuilder(); CommandHelpers::appendCommandStatusNoThrow(bob, e.toStatus()); } - return BSONObj(bb.release()); + return replyBuilder.releaseBody(); }(); postAssert(i, reply); diff --git a/src/mongo/db/s/get_database_version_command.cpp b/src/mongo/db/s/get_database_version_command.cpp index 8d7f3446db6..b6925c705f2 100644 --- a/src/mongo/db/s/get_database_version_command.cpp +++ b/src/mongo/db/s/get_database_version_command.cpp @@ -72,7 +72,7 @@ public: ActionType::getDatabaseVersion)); } - void run(OperationContext* opCtx, CommandReplyBuilder* result) override { + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { uassert(ErrorCodes::IllegalOperation, str::stream() << definition()->getName() << " can only be run on shard servers", serverGlobalParams.clusterRole == ClusterRole::ShardServer); @@ -83,7 +83,7 @@ public: versionObj = dbVersion->toBSON(); } } - result->append("dbVersion", versionObj); + result->getBodyBuilder().append("dbVersion", versionObj); } StringData _targetDb() const { diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 11a73d8b421..287be590179 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -460,7 +460,7 @@ void appendClusterAndOperationTime(OperationContext* opCtx, void invokeInTransaction(OperationContext* opCtx, CommandInvocation* invocation, - CommandReplyBuilder* replyBuilder) { + rpc::ReplyBuilderInterface* replyBuilder) { auto session = OperationContextSession::get(opCtx); if (!session) { // Run the command directly if we're not in a transaction. @@ -495,7 +495,6 @@ bool runCommandImpl(OperationContext* opCtx, const boost::optional<OperationSessionInfoFromClient>& sessionOptions) { const Command* command = invocation->definition(); auto bytesToReserve = command->reserveBytesForReply(); - // SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the // additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency // suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds. @@ -503,12 +502,11 @@ bool runCommandImpl(OperationContext* opCtx, if (kDebugBuild) bytesToReserve = 0; #endif - - CommandReplyBuilder crb(replyBuilder->getInPlaceReplyBuilder(bytesToReserve)); + replyBuilder->reserveBytes(bytesToReserve); if (!invocation->supportsWriteConcern()) { behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); - invokeInTransaction(opCtx, invocation, &crb); + invokeInTransaction(opCtx, invocation, replyBuilder); } else { auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body)); auto session = OperationContextSession::get(opCtx); @@ -539,13 +537,13 @@ bool runCommandImpl(OperationContext* opCtx, }; try { - invokeInTransaction(opCtx, invocation, &crb); + invokeInTransaction(opCtx, invocation, replyBuilder); } catch (const DBException&) { waitForWriteConcern(*extraFieldsBuilder); throw; } - waitForWriteConcern(crb.getBodyBuilder()); + waitForWriteConcern(replyBuilder->getBodyBuilder()); // Nothing in run() should change the writeConcern. dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == @@ -555,20 +553,20 @@ bool runCommandImpl(OperationContext* opCtx, behaviors.waitForLinearizableReadConcern(opCtx); const bool ok = [&] { - auto body = crb.getBodyBuilder(); + auto body = replyBuilder->getBodyBuilder(); return CommandHelpers::extractOrAppendOk(body); }(); - behaviors.attachCurOpErrInfo(opCtx, crb.getBodyBuilder().asTempObj()); + behaviors.attachCurOpErrInfo(opCtx, replyBuilder->getBodyBuilder().asTempObj()); if (!ok) { - auto response = crb.getBodyBuilder().asTempObj(); + auto response = replyBuilder->getBodyBuilder().asTempObj(); auto codeField = response["code"]; if (codeField.isNumber()) { auto code = ErrorCodes::Error(codeField.numberInt()); // Append the error labels for transient transaction errors. auto errorLabels = getErrorLabels(sessionOptions, command->getName(), code); - crb.getBodyBuilder().appendElements(errorLabels); + replyBuilder->getBodyBuilder().appendElements(errorLabels); } } @@ -576,7 +574,7 @@ bool runCommandImpl(OperationContext* opCtx, appendReplyMetadata(opCtx, request, &metadataBob); { - auto commandBodyBob = crb.getBodyBuilder(); + auto commandBodyBob = replyBuilder->getBodyBuilder(); appendClusterAndOperationTime(opCtx, &commandBodyBob, &metadataBob, startOperationTime); } diff --git a/src/mongo/rpc/command_reply_builder.cpp b/src/mongo/rpc/command_reply_builder.cpp index 534ad779f86..ba62df3e72e 100644 --- a/src/mongo/rpc/command_reply_builder.cpp +++ b/src/mongo/rpc/command_reply_builder.cpp @@ -53,14 +53,16 @@ CommandReplyBuilder& CommandReplyBuilder::setRawCommandReply(const BSONObj& comm return *this; } -BSONObjBuilder CommandReplyBuilder::getInPlaceReplyBuilder(std::size_t reserveBytes) { +BSONObjBuilder CommandReplyBuilder::getBodyBuilder() { + if (_state == State::kMetadata) { + invariant(_bodyOffset); + return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, _builder, _bodyOffset); + } invariant(_state == State::kCommandReply); - // Eagerly allocate reserveBytes bytes. - _builder.reserveBytes(reserveBytes); - // Claim our reservation immediately so we can actually write data to it. - _builder.claimReservedBytes(reserveBytes); _state = State::kMetadata; - return BSONObjBuilder(_builder); + auto bob = BSONObjBuilder(_builder); + _bodyOffset = bob.offset(); + return bob; } CommandReplyBuilder& CommandReplyBuilder::setMetadata(const BSONObj& metadata) { @@ -94,6 +96,7 @@ void CommandReplyBuilder::reset() { _builder.skip(mongo::MsgData::MsgDataHeaderSize); _message.reset(); _state = State::kCommandReply; + _bodyOffset = 0; } Message CommandReplyBuilder::done() { diff --git a/src/mongo/rpc/command_reply_builder.h b/src/mongo/rpc/command_reply_builder.h index cce27f90b01..d3d741d678f 100644 --- a/src/mongo/rpc/command_reply_builder.h +++ b/src/mongo/rpc/command_reply_builder.h @@ -57,7 +57,7 @@ public: CommandReplyBuilder& setRawCommandReply(const BSONObj& commandReply) final; - BSONObjBuilder getInPlaceReplyBuilder(std::size_t) final; + BSONObjBuilder getBodyBuilder() final; CommandReplyBuilder& setMetadata(const BSONObj& metadata) final; @@ -72,10 +72,16 @@ public: */ Message done() final; + void reserveBytes(const std::size_t bytes) final { + _builder.reserveBytes(bytes); + _builder.claimReservedBytes(bytes); + } + private: enum class State { kMetadata, kCommandReply, kOutputDocs, kDone }; // Default values are all empty. + std::size_t _bodyOffset = 0; BufBuilder _builder{}; Message _message; State _state{State::kCommandReply}; diff --git a/src/mongo/rpc/legacy_reply_builder.cpp b/src/mongo/rpc/legacy_reply_builder.cpp index 21e18cde76a..78663e9f75f 100644 --- a/src/mongo/rpc/legacy_reply_builder.cpp +++ b/src/mongo/rpc/legacy_reply_builder.cpp @@ -83,14 +83,17 @@ LegacyReplyBuilder& LegacyReplyBuilder::setRawCommandReply(const BSONObj& comman return *this; } -BSONObjBuilder LegacyReplyBuilder::getInPlaceReplyBuilder(std::size_t reserveBytes) { - invariant(_state == State::kCommandReply); - // Eagerly allocate reserveBytes bytes. - _builder.reserveBytes(reserveBytes); - // Claim our reservation immediately so we can actually write data to it. - _builder.claimReservedBytes(reserveBytes); - _state = State::kMetadata; - return BSONObjBuilder(_builder); +BSONObjBuilder LegacyReplyBuilder::getBodyBuilder() { + if (_state == State::kCommandReply) { + auto bob = BSONObjBuilder(_builder); + _bodyOffset = bob.offset(); + _state = State::kMetadata; + return bob; + } + + invariant(_state == State::kMetadata); + invariant(_bodyOffset); + return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, _builder, _bodyOffset); } LegacyReplyBuilder& LegacyReplyBuilder::setMetadata(const BSONObj& metadata) { @@ -105,6 +108,11 @@ Protocol LegacyReplyBuilder::getProtocol() const { return rpc::Protocol::kOpQuery; } +void LegacyReplyBuilder::reserveBytes(const std::size_t bytes) { + _builder.reserveBytes(bytes); + _builder.claimReservedBytes(bytes); +} + void LegacyReplyBuilder::reset() { // If we are in State::kMetadata, we are already in the 'start' state, so by // immediately returning, we save a heap allocation. @@ -116,6 +124,7 @@ void LegacyReplyBuilder::reset() { _message.reset(); _state = State::kCommandReply; _staleConfigError = false; + _bodyOffset = 0; } diff --git a/src/mongo/rpc/legacy_reply_builder.h b/src/mongo/rpc/legacy_reply_builder.h index 22f40092df6..4f3128bb2be 100644 --- a/src/mongo/rpc/legacy_reply_builder.h +++ b/src/mongo/rpc/legacy_reply_builder.h @@ -52,7 +52,7 @@ public: LegacyReplyBuilder& setCommandReply(Status nonOKStatus, BSONObj extraErrorInfo) final; LegacyReplyBuilder& setRawCommandReply(const BSONObj& commandReply) final; - BSONObjBuilder getInPlaceReplyBuilder(std::size_t) final; + BSONObjBuilder getBodyBuilder() final; LegacyReplyBuilder& setMetadata(const BSONObj& metadata) final; @@ -62,10 +62,13 @@ public: Protocol getProtocol() const final; + void reserveBytes(const std::size_t bytes) final; + private: enum class State { kMetadata, kCommandReply, kOutputDocs, kDone }; BufBuilder _builder{}; + std::size_t _bodyOffset = 0; Message _message; State _state{State::kCommandReply}; // For stale config errors we need to set the correct ResultFlag. diff --git a/src/mongo/rpc/op_msg.cpp b/src/mongo/rpc/op_msg.cpp index f886027e192..9199693b0e0 100644 --- a/src/mongo/rpc/op_msg.cpp +++ b/src/mongo/rpc/op_msg.cpp @@ -247,4 +247,15 @@ Message OpMsgBuilder::finish() { return Message(_buf.release()); } +BSONObj OpMsgBuilder::releaseBody() { + invariant(_state == kBody); + invariant(_bodyStart); + invariant(_bodyStart == sizeof(MSGHEADER::Layout) + 4 /*flags*/ + 1 /*body kind byte*/); + invariant(!_openBuilder); + _state = kDone; + + auto bson = BSONObj(_buf.buf() + _bodyStart); + return bson.shareOwnershipWith(_buf.release()); +} + } // namespace mongo diff --git a/src/mongo/rpc/op_msg.h b/src/mongo/rpc/op_msg.h index ed7f1993d86..dbe6a1506ef 100644 --- a/src/mongo/rpc/op_msg.h +++ b/src/mongo/rpc/op_msg.h @@ -220,6 +220,26 @@ public: */ static AtomicBool disableDupeFieldCheck_forTest; + /** + * Similar to finish, any calls on this object after are illegal. + */ + BSONObj releaseBody(); + + /** + * Returns whether or not this builder is already building a body. + */ + bool isBuildingBody() { + return _state == kBody; + } + + /** + * Reserves and claims the bytes requested in the internal BufBuilder. + */ + void reserveBytes(const std::size_t bytes) { + _buf.reserveBytes(bytes); + _buf.claimReservedBytes(bytes); + } + private: friend class DocSequenceBuilder; diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index ab69b57b206..e0bdb01bef6 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -165,4 +165,36 @@ TEST(OpMsg, CloseConnectionOnFireAndForgetNotMasterError) { ASSERT(foundSecondary); } +TEST(OpMsg, DocumentSequenceReturnsWork) { + std::string errMsg; + auto conn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, conn); + + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", BSON("echo" << 1)); + opMsgRequest.sequences.push_back({"example", {BSON("a" << 1), BSON("b" << 2)}}); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + + auto opMsgReply = OpMsg::parse(reply); + ASSERT_EQ(opMsgReply.sequences.size(), 1u); + + auto sequence = opMsgReply.getSequence("example"); + ASSERT(sequence); + ASSERT_EQ(sequence->objs.size(), 2u); + + auto checkSequence = [](auto& bson, auto key, auto val) { + ASSERT(bson.hasField(key)); + ASSERT_EQ(bson[key].Int(), val); + }; + checkSequence(sequence->objs[0], "a", 1); + checkSequence(sequence->objs[1], "b", 2); + + ASSERT_BSONOBJ_EQ(opMsgReply.body.getObjectField("echo"), + BSON("echo" << 1 << "$db" + << "admin")); +} + } // namespace mongo diff --git a/src/mongo/rpc/op_msg_rpc_impls.h b/src/mongo/rpc/op_msg_rpc_impls.h index 4725456630f..b16a9a7c0cd 100644 --- a/src/mongo/rpc/op_msg_rpc_impls.h +++ b/src/mongo/rpc/op_msg_rpc_impls.h @@ -59,13 +59,14 @@ public: _builder.beginBody().appendElements(reply); return *this; } - BSONObjBuilder getInPlaceReplyBuilder(std::size_t reserveBytes) override { - BSONObjBuilder bob = _builder.beginBody(); - // Eagerly reserve space and claim our reservation immediately so we can actually write data - // to it. - bob.bb().reserveBytes(reserveBytes); - bob.bb().claimReservedBytes(reserveBytes); - return bob; + BSONObjBuilder getBodyBuilder() override { + if (!_builder.isBuildingBody()) { + return _builder.beginBody(); + } + return _builder.resumeBody(); + } + OpMsgBuilder::DocSequenceBuilder getDocSequenceBuilder(StringData name) override { + return _builder.beginDocSequence(name); } ReplyBuilderInterface& setMetadata(const BSONObj& metadata) override { _builder.resumeBody().appendElements(metadata); @@ -80,6 +81,12 @@ public: Message done() override { return _builder.finish(); } + void reserveBytes(const std::size_t bytes) override { + _builder.reserveBytes(bytes); + } + BSONObj releaseBody() { + return _builder.releaseBody(); + } private: OpMsgBuilder _builder; diff --git a/src/mongo/rpc/reply_builder_interface.h b/src/mongo/rpc/reply_builder_interface.h index a157f41167e..70524fc3a64 100644 --- a/src/mongo/rpc/reply_builder_interface.h +++ b/src/mongo/rpc/reply_builder_interface.h @@ -33,6 +33,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/bson/util/builder.h" +#include "mongo/rpc/op_msg.h" #include "mongo/rpc/protocol.h" namespace mongo { @@ -58,9 +59,21 @@ public: virtual ReplyBuilderInterface& setRawCommandReply(const BSONObj& reply) = 0; /** - * Returns a BSONObjBuilder for building a command reply in place. + * Returns a BSONObjBuilder that can be used to build the reply in-place. The returned + * builder (or an object into which it has been moved) must be completed before calling + * any more methods on this object. A builder is completed by a call to `done()` or by + * its destructor. Can be called repeatedly to append multiple things to the reply, as + * long as each returned builder is completed between calls. */ - virtual BSONObjBuilder getInPlaceReplyBuilder(std::size_t reserveBytes) = 0; + virtual BSONObjBuilder getBodyBuilder() = 0; + + /** + * Returns a DocSeqBuilder for building a command reply in place. This should only be called + * before the body as the body will have status types appended at the end. + */ + virtual OpMsgBuilder::DocSequenceBuilder getDocSequenceBuilder(StringData name) { + uasserted(99980, "Only OpMsg may use document sequences"); + } virtual ReplyBuilderInterface& setMetadata(const BSONObj& metadata) = 0; @@ -104,6 +117,26 @@ public: */ virtual Message done() = 0; + /** + * The specified 'object' must be BSON-serializable. + * + * BSONSerializable 'x' means 'x.serialize(bob)' appends a representation of 'x' + * into 'BSONObjBuilder* bob'. + */ + template <typename T> + void fillFrom(const T& object) { + static_assert(!isStatusOrStatusWith<std::decay_t<T>>, + "Status and StatusWith<T> aren't supported by TypedCommand and fillFrom(). " + "Use uassertStatusOK() instead."); + auto bob = getBodyBuilder(); + object.serialize(&bob); + } + + /** + * Reserves and claims bytes for the Message generated by this interface. + */ + virtual void reserveBytes(const std::size_t bytes) = 0; + protected: ReplyBuilderInterface() = default; }; diff --git a/src/mongo/s/commands/cluster_explain_cmd.cpp b/src/mongo/s/commands/cluster_explain_cmd.cpp index ff7fc38f7be..e007b7bab8c 100644 --- a/src/mongo/s/commands/cluster_explain_cmd.cpp +++ b/src/mongo/s/commands/cluster_explain_cmd.cpp @@ -95,7 +95,7 @@ public: _innerInvocation{std::move(innerInvocation)} {} private: - void run(OperationContext* opCtx, CommandReplyBuilder* result) override { + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { try { auto bob = result->getBodyBuilder(); _innerInvocation->explain(opCtx, _verbosity, &bob); diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index d97f24b34bb..412f64c9885 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -302,7 +302,7 @@ private: return response.getOk(); } - void run(OperationContext* opCtx, CommandReplyBuilder* result) override { + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { try { BSONObjBuilder bob = result->getBodyBuilder(); bool ok = runImpl(opCtx, *_request, _batchedRequest, bob); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 5b82fe3a055..ddd4ce23f7e 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -62,6 +62,7 @@ #include "mongo/rpc/metadata/logical_time_metadata.h" #include "mongo/rpc/metadata/tracking_metadata.h" #include "mongo/rpc/op_msg.h" +#include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/s/cannot_implicitly_create_collection_info.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/parallel.h" @@ -151,7 +152,7 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res void execCommandClient(OperationContext* opCtx, CommandInvocation* invocation, const OpMsgRequest& request, - CommandReplyBuilder* result) { + rpc::ReplyBuilderInterface* result) { const Command* c = invocation->definition(); ON_BLOCK_EXIT([opCtx, &result] { auto body = result->getBodyBuilder(); @@ -289,10 +290,11 @@ MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError); void runCommand(OperationContext* opCtx, const OpMsgRequest& request, const NetworkOp opType, - BSONObjBuilder&& builder) { + rpc::ReplyBuilderInterface* replyBuilder) { auto const commandName = request.getCommandName(); auto const command = CommandHelpers::findCommand(commandName); if (!command) { + auto builder = replyBuilder->getBodyBuilder(); ON_BLOCK_EXIT([opCtx, &builder] { appendRequiredFieldsToResponse(opCtx, &builder); }); CommandHelpers::appendCommandStatusNoThrow( builder, @@ -355,12 +357,11 @@ void runCommand(OperationContext* opCtx, auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); auto readConcernParseStatus = readConcernArgs.initialize(request.body); if (!readConcernParseStatus.isOK()) { + auto builder = replyBuilder->getBodyBuilder(); CommandHelpers::appendCommandStatusNoThrow(builder, readConcernParseStatus); return; } - CommandReplyBuilder crb(std::move(builder)); - try { for (int tries = 0;; ++tries) { // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown. @@ -374,9 +375,9 @@ void runCommand(OperationContext* opCtx, "unexpected change of namespace when retrying"); } - crb.reset(); + replyBuilder->reset(); try { - execCommandClient(opCtx, invocation.get(), request, &crb); + execCommandClient(opCtx, invocation.get(), request, replyBuilder); return; } catch (const ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) { const auto staleNs = [&] { @@ -421,12 +422,8 @@ void runCommand(OperationContext* opCtx, } } catch (const DBException& e) { command->incrementCommandsFailed(); - CurOp::get(opCtx)->debug().errInfo = e.toStatus(); LastError::get(opCtx->getClient()).setLastError(e.code(), e.reason()); - crb.reset(); - BSONObjBuilder bob = crb.getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow(bob, e.toStatus()); - appendRequiredFieldsToResponse(opCtx, &bob); + throw; } } @@ -558,7 +555,7 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) { std::string db = request.getDatabase().toString(); try { LOG(3) << "Command begin db: " << db << " msg id: " << m.header().getId(); - runCommand(opCtx, request, m.operation(), reply->getInPlaceReplyBuilder(0)); + runCommand(opCtx, request, m.operation(), reply.get()); LOG(3) << "Command end db: " << db << " msg id: " << m.header().getId(); } catch (const DBException& ex) { LOG(1) << "Exception thrown while processing command on " << db @@ -573,7 +570,7 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) { throw; } reply->reset(); - auto bob = reply->getInPlaceReplyBuilder(0); + auto bob = reply->getBodyBuilder(); CommandHelpers::appendCommandStatusNoThrow(bob, ex.toStatus()); appendRequiredFieldsToResponse(opCtx, &bob); } @@ -719,11 +716,10 @@ void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) { } void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) { - BufBuilder bb; + const auto& msg = dbm->msg(); + rpc::OpMsgReplyBuilder reply; runCommand(opCtx, [&]() { - const auto& msg = dbm->msg(); - switch (msg.operation()) { case dbInsert: { return InsertOp::parseLegacy(msg).serialize({}); @@ -738,8 +734,8 @@ void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) { MONGO_UNREACHABLE; } }(), - dbm->msg().operation(), - BSONObjBuilder{bb}); // built object is ignored + msg.operation(), + &reply); // built object is ignored } void Strategy::explainFind(OperationContext* opCtx, |