summaryrefslogtreecommitdiff
path: root/src/mongo/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/rpc')
-rw-r--r--src/mongo/rpc/command_reply_builder.cpp15
-rw-r--r--src/mongo/rpc/command_reply_builder.h8
-rw-r--r--src/mongo/rpc/legacy_reply_builder.cpp25
-rw-r--r--src/mongo/rpc/legacy_reply_builder.h5
-rw-r--r--src/mongo/rpc/op_msg.cpp11
-rw-r--r--src/mongo/rpc/op_msg.h20
-rw-r--r--src/mongo/rpc/op_msg_integration_test.cpp32
-rw-r--r--src/mongo/rpc/op_msg_rpc_impls.h21
-rw-r--r--src/mongo/rpc/reply_builder_interface.h37
9 files changed, 149 insertions, 25 deletions
diff --git a/src/mongo/rpc/command_reply_builder.cpp b/src/mongo/rpc/command_reply_builder.cpp
index 534ad779f86..ba62df3e72e 100644
--- a/src/mongo/rpc/command_reply_builder.cpp
+++ b/src/mongo/rpc/command_reply_builder.cpp
@@ -53,14 +53,16 @@ CommandReplyBuilder& CommandReplyBuilder::setRawCommandReply(const BSONObj& comm
return *this;
}
-BSONObjBuilder CommandReplyBuilder::getInPlaceReplyBuilder(std::size_t reserveBytes) {
+BSONObjBuilder CommandReplyBuilder::getBodyBuilder() {
+ if (_state == State::kMetadata) {
+ invariant(_bodyOffset);
+ return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, _builder, _bodyOffset);
+ }
invariant(_state == State::kCommandReply);
- // Eagerly allocate reserveBytes bytes.
- _builder.reserveBytes(reserveBytes);
- // Claim our reservation immediately so we can actually write data to it.
- _builder.claimReservedBytes(reserveBytes);
_state = State::kMetadata;
- return BSONObjBuilder(_builder);
+ auto bob = BSONObjBuilder(_builder);
+ _bodyOffset = bob.offset();
+ return bob;
}
CommandReplyBuilder& CommandReplyBuilder::setMetadata(const BSONObj& metadata) {
@@ -94,6 +96,7 @@ void CommandReplyBuilder::reset() {
_builder.skip(mongo::MsgData::MsgDataHeaderSize);
_message.reset();
_state = State::kCommandReply;
+ _bodyOffset = 0;
}
Message CommandReplyBuilder::done() {
diff --git a/src/mongo/rpc/command_reply_builder.h b/src/mongo/rpc/command_reply_builder.h
index cce27f90b01..d3d741d678f 100644
--- a/src/mongo/rpc/command_reply_builder.h
+++ b/src/mongo/rpc/command_reply_builder.h
@@ -57,7 +57,7 @@ public:
CommandReplyBuilder& setRawCommandReply(const BSONObj& commandReply) final;
- BSONObjBuilder getInPlaceReplyBuilder(std::size_t) final;
+ BSONObjBuilder getBodyBuilder() final;
CommandReplyBuilder& setMetadata(const BSONObj& metadata) final;
@@ -72,10 +72,16 @@ public:
*/
Message done() final;
+ void reserveBytes(const std::size_t bytes) final {
+ _builder.reserveBytes(bytes);
+ _builder.claimReservedBytes(bytes);
+ }
+
private:
enum class State { kMetadata, kCommandReply, kOutputDocs, kDone };
// Default values are all empty.
+ std::size_t _bodyOffset = 0;
BufBuilder _builder{};
Message _message;
State _state{State::kCommandReply};
diff --git a/src/mongo/rpc/legacy_reply_builder.cpp b/src/mongo/rpc/legacy_reply_builder.cpp
index 21e18cde76a..78663e9f75f 100644
--- a/src/mongo/rpc/legacy_reply_builder.cpp
+++ b/src/mongo/rpc/legacy_reply_builder.cpp
@@ -83,14 +83,17 @@ LegacyReplyBuilder& LegacyReplyBuilder::setRawCommandReply(const BSONObj& comman
return *this;
}
-BSONObjBuilder LegacyReplyBuilder::getInPlaceReplyBuilder(std::size_t reserveBytes) {
- invariant(_state == State::kCommandReply);
- // Eagerly allocate reserveBytes bytes.
- _builder.reserveBytes(reserveBytes);
- // Claim our reservation immediately so we can actually write data to it.
- _builder.claimReservedBytes(reserveBytes);
- _state = State::kMetadata;
- return BSONObjBuilder(_builder);
+BSONObjBuilder LegacyReplyBuilder::getBodyBuilder() {
+ if (_state == State::kCommandReply) {
+ auto bob = BSONObjBuilder(_builder);
+ _bodyOffset = bob.offset();
+ _state = State::kMetadata;
+ return bob;
+ }
+
+ invariant(_state == State::kMetadata);
+ invariant(_bodyOffset);
+ return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, _builder, _bodyOffset);
}
LegacyReplyBuilder& LegacyReplyBuilder::setMetadata(const BSONObj& metadata) {
@@ -105,6 +108,11 @@ Protocol LegacyReplyBuilder::getProtocol() const {
return rpc::Protocol::kOpQuery;
}
+void LegacyReplyBuilder::reserveBytes(const std::size_t bytes) {
+ _builder.reserveBytes(bytes);
+ _builder.claimReservedBytes(bytes);
+}
+
void LegacyReplyBuilder::reset() {
// If we are in State::kMetadata, we are already in the 'start' state, so by
// immediately returning, we save a heap allocation.
@@ -116,6 +124,7 @@ void LegacyReplyBuilder::reset() {
_message.reset();
_state = State::kCommandReply;
_staleConfigError = false;
+ _bodyOffset = 0;
}
diff --git a/src/mongo/rpc/legacy_reply_builder.h b/src/mongo/rpc/legacy_reply_builder.h
index 22f40092df6..4f3128bb2be 100644
--- a/src/mongo/rpc/legacy_reply_builder.h
+++ b/src/mongo/rpc/legacy_reply_builder.h
@@ -52,7 +52,7 @@ public:
LegacyReplyBuilder& setCommandReply(Status nonOKStatus, BSONObj extraErrorInfo) final;
LegacyReplyBuilder& setRawCommandReply(const BSONObj& commandReply) final;
- BSONObjBuilder getInPlaceReplyBuilder(std::size_t) final;
+ BSONObjBuilder getBodyBuilder() final;
LegacyReplyBuilder& setMetadata(const BSONObj& metadata) final;
@@ -62,10 +62,13 @@ public:
Protocol getProtocol() const final;
+ void reserveBytes(const std::size_t bytes) final;
+
private:
enum class State { kMetadata, kCommandReply, kOutputDocs, kDone };
BufBuilder _builder{};
+ std::size_t _bodyOffset = 0;
Message _message;
State _state{State::kCommandReply};
// For stale config errors we need to set the correct ResultFlag.
diff --git a/src/mongo/rpc/op_msg.cpp b/src/mongo/rpc/op_msg.cpp
index f886027e192..9199693b0e0 100644
--- a/src/mongo/rpc/op_msg.cpp
+++ b/src/mongo/rpc/op_msg.cpp
@@ -247,4 +247,15 @@ Message OpMsgBuilder::finish() {
return Message(_buf.release());
}
+BSONObj OpMsgBuilder::releaseBody() {
+ invariant(_state == kBody);
+ invariant(_bodyStart);
+ invariant(_bodyStart == sizeof(MSGHEADER::Layout) + 4 /*flags*/ + 1 /*body kind byte*/);
+ invariant(!_openBuilder);
+ _state = kDone;
+
+ auto bson = BSONObj(_buf.buf() + _bodyStart);
+ return bson.shareOwnershipWith(_buf.release());
+}
+
} // namespace mongo
diff --git a/src/mongo/rpc/op_msg.h b/src/mongo/rpc/op_msg.h
index ed7f1993d86..dbe6a1506ef 100644
--- a/src/mongo/rpc/op_msg.h
+++ b/src/mongo/rpc/op_msg.h
@@ -220,6 +220,26 @@ public:
*/
static AtomicBool disableDupeFieldCheck_forTest;
+ /**
+ * Similar to finish, any calls on this object after are illegal.
+ */
+ BSONObj releaseBody();
+
+ /**
+ * Returns whether or not this builder is already building a body.
+ */
+ bool isBuildingBody() {
+ return _state == kBody;
+ }
+
+ /**
+ * Reserves and claims the bytes requested in the internal BufBuilder.
+ */
+ void reserveBytes(const std::size_t bytes) {
+ _buf.reserveBytes(bytes);
+ _buf.claimReservedBytes(bytes);
+ }
+
private:
friend class DocSequenceBuilder;
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp
index ab69b57b206..e0bdb01bef6 100644
--- a/src/mongo/rpc/op_msg_integration_test.cpp
+++ b/src/mongo/rpc/op_msg_integration_test.cpp
@@ -165,4 +165,36 @@ TEST(OpMsg, CloseConnectionOnFireAndForgetNotMasterError) {
ASSERT(foundSecondary);
}
+TEST(OpMsg, DocumentSequenceReturnsWork) {
+ std::string errMsg;
+ auto conn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, conn);
+
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", BSON("echo" << 1));
+ opMsgRequest.sequences.push_back({"example", {BSON("a" << 1), BSON("b" << 2)}});
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+
+ auto opMsgReply = OpMsg::parse(reply);
+ ASSERT_EQ(opMsgReply.sequences.size(), 1u);
+
+ auto sequence = opMsgReply.getSequence("example");
+ ASSERT(sequence);
+ ASSERT_EQ(sequence->objs.size(), 2u);
+
+ auto checkSequence = [](auto& bson, auto key, auto val) {
+ ASSERT(bson.hasField(key));
+ ASSERT_EQ(bson[key].Int(), val);
+ };
+ checkSequence(sequence->objs[0], "a", 1);
+ checkSequence(sequence->objs[1], "b", 2);
+
+ ASSERT_BSONOBJ_EQ(opMsgReply.body.getObjectField("echo"),
+ BSON("echo" << 1 << "$db"
+ << "admin"));
+}
+
} // namespace mongo
diff --git a/src/mongo/rpc/op_msg_rpc_impls.h b/src/mongo/rpc/op_msg_rpc_impls.h
index 4725456630f..b16a9a7c0cd 100644
--- a/src/mongo/rpc/op_msg_rpc_impls.h
+++ b/src/mongo/rpc/op_msg_rpc_impls.h
@@ -59,13 +59,14 @@ public:
_builder.beginBody().appendElements(reply);
return *this;
}
- BSONObjBuilder getInPlaceReplyBuilder(std::size_t reserveBytes) override {
- BSONObjBuilder bob = _builder.beginBody();
- // Eagerly reserve space and claim our reservation immediately so we can actually write data
- // to it.
- bob.bb().reserveBytes(reserveBytes);
- bob.bb().claimReservedBytes(reserveBytes);
- return bob;
+ BSONObjBuilder getBodyBuilder() override {
+ if (!_builder.isBuildingBody()) {
+ return _builder.beginBody();
+ }
+ return _builder.resumeBody();
+ }
+ OpMsgBuilder::DocSequenceBuilder getDocSequenceBuilder(StringData name) override {
+ return _builder.beginDocSequence(name);
}
ReplyBuilderInterface& setMetadata(const BSONObj& metadata) override {
_builder.resumeBody().appendElements(metadata);
@@ -80,6 +81,12 @@ public:
Message done() override {
return _builder.finish();
}
+ void reserveBytes(const std::size_t bytes) override {
+ _builder.reserveBytes(bytes);
+ }
+ BSONObj releaseBody() {
+ return _builder.releaseBody();
+ }
private:
OpMsgBuilder _builder;
diff --git a/src/mongo/rpc/reply_builder_interface.h b/src/mongo/rpc/reply_builder_interface.h
index a157f41167e..70524fc3a64 100644
--- a/src/mongo/rpc/reply_builder_interface.h
+++ b/src/mongo/rpc/reply_builder_interface.h
@@ -33,6 +33,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
#include "mongo/bson/util/builder.h"
+#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/protocol.h"
namespace mongo {
@@ -58,9 +59,21 @@ public:
virtual ReplyBuilderInterface& setRawCommandReply(const BSONObj& reply) = 0;
/**
- * Returns a BSONObjBuilder for building a command reply in place.
+ * Returns a BSONObjBuilder that can be used to build the reply in-place. The returned
+ * builder (or an object into which it has been moved) must be completed before calling
+ * any more methods on this object. A builder is completed by a call to `done()` or by
+ * its destructor. Can be called repeatedly to append multiple things to the reply, as
+ * long as each returned builder is completed between calls.
*/
- virtual BSONObjBuilder getInPlaceReplyBuilder(std::size_t reserveBytes) = 0;
+ virtual BSONObjBuilder getBodyBuilder() = 0;
+
+ /**
+ * Returns a DocSeqBuilder for building a command reply in place. This should only be called
+ * before the body as the body will have status types appended at the end.
+ */
+ virtual OpMsgBuilder::DocSequenceBuilder getDocSequenceBuilder(StringData name) {
+ uasserted(99980, "Only OpMsg may use document sequences");
+ }
virtual ReplyBuilderInterface& setMetadata(const BSONObj& metadata) = 0;
@@ -104,6 +117,26 @@ public:
*/
virtual Message done() = 0;
+ /**
+ * The specified 'object' must be BSON-serializable.
+ *
+ * BSONSerializable 'x' means 'x.serialize(bob)' appends a representation of 'x'
+ * into 'BSONObjBuilder* bob'.
+ */
+ template <typename T>
+ void fillFrom(const T& object) {
+ static_assert(!isStatusOrStatusWith<std::decay_t<T>>,
+ "Status and StatusWith<T> aren't supported by TypedCommand and fillFrom(). "
+ "Use uassertStatusOK() instead.");
+ auto bob = getBodyBuilder();
+ object.serialize(&bob);
+ }
+
+ /**
+ * Reserves and claims bytes for the Message generated by this interface.
+ */
+ virtual void reserveBytes(const std::size_t bytes) = 0;
+
protected:
ReplyBuilderInterface() = default;
};