diff options
Diffstat (limited to 'src/mongo/rpc')
-rw-r--r-- | src/mongo/rpc/command_reply_builder.cpp | 15 | ||||
-rw-r--r-- | src/mongo/rpc/command_reply_builder.h | 8 | ||||
-rw-r--r-- | src/mongo/rpc/legacy_reply_builder.cpp | 25 | ||||
-rw-r--r-- | src/mongo/rpc/legacy_reply_builder.h | 5 | ||||
-rw-r--r-- | src/mongo/rpc/op_msg.cpp | 11 | ||||
-rw-r--r-- | src/mongo/rpc/op_msg.h | 20 | ||||
-rw-r--r-- | src/mongo/rpc/op_msg_integration_test.cpp | 32 | ||||
-rw-r--r-- | src/mongo/rpc/op_msg_rpc_impls.h | 21 | ||||
-rw-r--r-- | src/mongo/rpc/reply_builder_interface.h | 37 |
9 files changed, 149 insertions, 25 deletions
diff --git a/src/mongo/rpc/command_reply_builder.cpp b/src/mongo/rpc/command_reply_builder.cpp index 534ad779f86..ba62df3e72e 100644 --- a/src/mongo/rpc/command_reply_builder.cpp +++ b/src/mongo/rpc/command_reply_builder.cpp @@ -53,14 +53,16 @@ CommandReplyBuilder& CommandReplyBuilder::setRawCommandReply(const BSONObj& comm return *this; } -BSONObjBuilder CommandReplyBuilder::getInPlaceReplyBuilder(std::size_t reserveBytes) { +BSONObjBuilder CommandReplyBuilder::getBodyBuilder() { + if (_state == State::kMetadata) { + invariant(_bodyOffset); + return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, _builder, _bodyOffset); + } invariant(_state == State::kCommandReply); - // Eagerly allocate reserveBytes bytes. - _builder.reserveBytes(reserveBytes); - // Claim our reservation immediately so we can actually write data to it. - _builder.claimReservedBytes(reserveBytes); _state = State::kMetadata; - return BSONObjBuilder(_builder); + auto bob = BSONObjBuilder(_builder); + _bodyOffset = bob.offset(); + return bob; } CommandReplyBuilder& CommandReplyBuilder::setMetadata(const BSONObj& metadata) { @@ -94,6 +96,7 @@ void CommandReplyBuilder::reset() { _builder.skip(mongo::MsgData::MsgDataHeaderSize); _message.reset(); _state = State::kCommandReply; + _bodyOffset = 0; } Message CommandReplyBuilder::done() { diff --git a/src/mongo/rpc/command_reply_builder.h b/src/mongo/rpc/command_reply_builder.h index cce27f90b01..d3d741d678f 100644 --- a/src/mongo/rpc/command_reply_builder.h +++ b/src/mongo/rpc/command_reply_builder.h @@ -57,7 +57,7 @@ public: CommandReplyBuilder& setRawCommandReply(const BSONObj& commandReply) final; - BSONObjBuilder getInPlaceReplyBuilder(std::size_t) final; + BSONObjBuilder getBodyBuilder() final; CommandReplyBuilder& setMetadata(const BSONObj& metadata) final; @@ -72,10 +72,16 @@ public: */ Message done() final; + void reserveBytes(const std::size_t bytes) final { + _builder.reserveBytes(bytes); + _builder.claimReservedBytes(bytes); + } + private: enum class State { kMetadata, kCommandReply, kOutputDocs, kDone }; // Default values are all empty. + std::size_t _bodyOffset = 0; BufBuilder _builder{}; Message _message; State _state{State::kCommandReply}; diff --git a/src/mongo/rpc/legacy_reply_builder.cpp b/src/mongo/rpc/legacy_reply_builder.cpp index 21e18cde76a..78663e9f75f 100644 --- a/src/mongo/rpc/legacy_reply_builder.cpp +++ b/src/mongo/rpc/legacy_reply_builder.cpp @@ -83,14 +83,17 @@ LegacyReplyBuilder& LegacyReplyBuilder::setRawCommandReply(const BSONObj& comman return *this; } -BSONObjBuilder LegacyReplyBuilder::getInPlaceReplyBuilder(std::size_t reserveBytes) { - invariant(_state == State::kCommandReply); - // Eagerly allocate reserveBytes bytes. - _builder.reserveBytes(reserveBytes); - // Claim our reservation immediately so we can actually write data to it. - _builder.claimReservedBytes(reserveBytes); - _state = State::kMetadata; - return BSONObjBuilder(_builder); +BSONObjBuilder LegacyReplyBuilder::getBodyBuilder() { + if (_state == State::kCommandReply) { + auto bob = BSONObjBuilder(_builder); + _bodyOffset = bob.offset(); + _state = State::kMetadata; + return bob; + } + + invariant(_state == State::kMetadata); + invariant(_bodyOffset); + return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, _builder, _bodyOffset); } LegacyReplyBuilder& LegacyReplyBuilder::setMetadata(const BSONObj& metadata) { @@ -105,6 +108,11 @@ Protocol LegacyReplyBuilder::getProtocol() const { return rpc::Protocol::kOpQuery; } +void LegacyReplyBuilder::reserveBytes(const std::size_t bytes) { + _builder.reserveBytes(bytes); + _builder.claimReservedBytes(bytes); +} + void LegacyReplyBuilder::reset() { // If we are in State::kMetadata, we are already in the 'start' state, so by // immediately returning, we save a heap allocation. @@ -116,6 +124,7 @@ void LegacyReplyBuilder::reset() { _message.reset(); _state = State::kCommandReply; _staleConfigError = false; + _bodyOffset = 0; } diff --git a/src/mongo/rpc/legacy_reply_builder.h b/src/mongo/rpc/legacy_reply_builder.h index 22f40092df6..4f3128bb2be 100644 --- a/src/mongo/rpc/legacy_reply_builder.h +++ b/src/mongo/rpc/legacy_reply_builder.h @@ -52,7 +52,7 @@ public: LegacyReplyBuilder& setCommandReply(Status nonOKStatus, BSONObj extraErrorInfo) final; LegacyReplyBuilder& setRawCommandReply(const BSONObj& commandReply) final; - BSONObjBuilder getInPlaceReplyBuilder(std::size_t) final; + BSONObjBuilder getBodyBuilder() final; LegacyReplyBuilder& setMetadata(const BSONObj& metadata) final; @@ -62,10 +62,13 @@ public: Protocol getProtocol() const final; + void reserveBytes(const std::size_t bytes) final; + private: enum class State { kMetadata, kCommandReply, kOutputDocs, kDone }; BufBuilder _builder{}; + std::size_t _bodyOffset = 0; Message _message; State _state{State::kCommandReply}; // For stale config errors we need to set the correct ResultFlag. diff --git a/src/mongo/rpc/op_msg.cpp b/src/mongo/rpc/op_msg.cpp index f886027e192..9199693b0e0 100644 --- a/src/mongo/rpc/op_msg.cpp +++ b/src/mongo/rpc/op_msg.cpp @@ -247,4 +247,15 @@ Message OpMsgBuilder::finish() { return Message(_buf.release()); } +BSONObj OpMsgBuilder::releaseBody() { + invariant(_state == kBody); + invariant(_bodyStart); + invariant(_bodyStart == sizeof(MSGHEADER::Layout) + 4 /*flags*/ + 1 /*body kind byte*/); + invariant(!_openBuilder); + _state = kDone; + + auto bson = BSONObj(_buf.buf() + _bodyStart); + return bson.shareOwnershipWith(_buf.release()); +} + } // namespace mongo diff --git a/src/mongo/rpc/op_msg.h b/src/mongo/rpc/op_msg.h index ed7f1993d86..dbe6a1506ef 100644 --- a/src/mongo/rpc/op_msg.h +++ b/src/mongo/rpc/op_msg.h @@ -220,6 +220,26 @@ public: */ static AtomicBool disableDupeFieldCheck_forTest; + /** + * Similar to finish, any calls on this object after are illegal. + */ + BSONObj releaseBody(); + + /** + * Returns whether or not this builder is already building a body. + */ + bool isBuildingBody() { + return _state == kBody; + } + + /** + * Reserves and claims the bytes requested in the internal BufBuilder. + */ + void reserveBytes(const std::size_t bytes) { + _buf.reserveBytes(bytes); + _buf.claimReservedBytes(bytes); + } + private: friend class DocSequenceBuilder; diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index ab69b57b206..e0bdb01bef6 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -165,4 +165,36 @@ TEST(OpMsg, CloseConnectionOnFireAndForgetNotMasterError) { ASSERT(foundSecondary); } +TEST(OpMsg, DocumentSequenceReturnsWork) { + std::string errMsg; + auto conn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, conn); + + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", BSON("echo" << 1)); + opMsgRequest.sequences.push_back({"example", {BSON("a" << 1), BSON("b" << 2)}}); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + + auto opMsgReply = OpMsg::parse(reply); + ASSERT_EQ(opMsgReply.sequences.size(), 1u); + + auto sequence = opMsgReply.getSequence("example"); + ASSERT(sequence); + ASSERT_EQ(sequence->objs.size(), 2u); + + auto checkSequence = [](auto& bson, auto key, auto val) { + ASSERT(bson.hasField(key)); + ASSERT_EQ(bson[key].Int(), val); + }; + checkSequence(sequence->objs[0], "a", 1); + checkSequence(sequence->objs[1], "b", 2); + + ASSERT_BSONOBJ_EQ(opMsgReply.body.getObjectField("echo"), + BSON("echo" << 1 << "$db" + << "admin")); +} + } // namespace mongo diff --git a/src/mongo/rpc/op_msg_rpc_impls.h b/src/mongo/rpc/op_msg_rpc_impls.h index 4725456630f..b16a9a7c0cd 100644 --- a/src/mongo/rpc/op_msg_rpc_impls.h +++ b/src/mongo/rpc/op_msg_rpc_impls.h @@ -59,13 +59,14 @@ public: _builder.beginBody().appendElements(reply); return *this; } - BSONObjBuilder getInPlaceReplyBuilder(std::size_t reserveBytes) override { - BSONObjBuilder bob = _builder.beginBody(); - // Eagerly reserve space and claim our reservation immediately so we can actually write data - // to it. - bob.bb().reserveBytes(reserveBytes); - bob.bb().claimReservedBytes(reserveBytes); - return bob; + BSONObjBuilder getBodyBuilder() override { + if (!_builder.isBuildingBody()) { + return _builder.beginBody(); + } + return _builder.resumeBody(); + } + OpMsgBuilder::DocSequenceBuilder getDocSequenceBuilder(StringData name) override { + return _builder.beginDocSequence(name); } ReplyBuilderInterface& setMetadata(const BSONObj& metadata) override { _builder.resumeBody().appendElements(metadata); @@ -80,6 +81,12 @@ public: Message done() override { return _builder.finish(); } + void reserveBytes(const std::size_t bytes) override { + _builder.reserveBytes(bytes); + } + BSONObj releaseBody() { + return _builder.releaseBody(); + } private: OpMsgBuilder _builder; diff --git a/src/mongo/rpc/reply_builder_interface.h b/src/mongo/rpc/reply_builder_interface.h index a157f41167e..70524fc3a64 100644 --- a/src/mongo/rpc/reply_builder_interface.h +++ b/src/mongo/rpc/reply_builder_interface.h @@ -33,6 +33,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/bson/util/builder.h" +#include "mongo/rpc/op_msg.h" #include "mongo/rpc/protocol.h" namespace mongo { @@ -58,9 +59,21 @@ public: virtual ReplyBuilderInterface& setRawCommandReply(const BSONObj& reply) = 0; /** - * Returns a BSONObjBuilder for building a command reply in place. + * Returns a BSONObjBuilder that can be used to build the reply in-place. The returned + * builder (or an object into which it has been moved) must be completed before calling + * any more methods on this object. A builder is completed by a call to `done()` or by + * its destructor. Can be called repeatedly to append multiple things to the reply, as + * long as each returned builder is completed between calls. */ - virtual BSONObjBuilder getInPlaceReplyBuilder(std::size_t reserveBytes) = 0; + virtual BSONObjBuilder getBodyBuilder() = 0; + + /** + * Returns a DocSeqBuilder for building a command reply in place. This should only be called + * before the body as the body will have status types appended at the end. + */ + virtual OpMsgBuilder::DocSequenceBuilder getDocSequenceBuilder(StringData name) { + uasserted(99980, "Only OpMsg may use document sequences"); + } virtual ReplyBuilderInterface& setMetadata(const BSONObj& metadata) = 0; @@ -104,6 +117,26 @@ public: */ virtual Message done() = 0; + /** + * The specified 'object' must be BSON-serializable. + * + * BSONSerializable 'x' means 'x.serialize(bob)' appends a representation of 'x' + * into 'BSONObjBuilder* bob'. + */ + template <typename T> + void fillFrom(const T& object) { + static_assert(!isStatusOrStatusWith<std::decay_t<T>>, + "Status and StatusWith<T> aren't supported by TypedCommand and fillFrom(). " + "Use uassertStatusOK() instead."); + auto bob = getBodyBuilder(); + object.serialize(&bob); + } + + /** + * Reserves and claims bytes for the Message generated by this interface. + */ + virtual void reserveBytes(const std::size_t bytes) = 0; + protected: ReplyBuilderInterface() = default; }; |