summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnthony Roy <anthony.roy@10gen.com>2018-06-19 17:53:48 -0400
committerAnthony Roy <anthony.roy@10gen.com>2018-07-05 17:43:25 -0400
commitf78056a8f1f5ea6af23bd68123659b714233b370 (patch)
treeca9bf843fec69e4ba687bdee55dade490b5ac246 /src
parent173ac70346990e4cf9b2720f86697785b8795967 (diff)
downloadmongo-f78056a8f1f5ea6af23bd68123659b714233b370.tar.gz
SERVER-35460 Replaced CommandReplyBuilder with direct usage of ReplyBuilderInterface
This is to provide access to DocumentSequence returns.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands.cpp34
-rw-r--r--src/mongo/db/commands.h54
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/explain_cmd.cpp2
-rw-r--r--src/mongo/db/commands/generic.cpp12
-rw-r--r--src/mongo/db/commands/mr_test.cpp8
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp2
-rw-r--r--src/mongo/db/commands_test.cpp15
-rw-r--r--src/mongo/db/s/get_database_version_command.cpp4
-rw-r--r--src/mongo/db/service_entry_point_common.cpp22
-rw-r--r--src/mongo/rpc/command_reply_builder.cpp15
-rw-r--r--src/mongo/rpc/command_reply_builder.h8
-rw-r--r--src/mongo/rpc/legacy_reply_builder.cpp25
-rw-r--r--src/mongo/rpc/legacy_reply_builder.h5
-rw-r--r--src/mongo/rpc/op_msg.cpp11
-rw-r--r--src/mongo/rpc/op_msg.h20
-rw-r--r--src/mongo/rpc/op_msg_integration_test.cpp32
-rw-r--r--src/mongo/rpc/op_msg_rpc_impls.h21
-rw-r--r--src/mongo/rpc/reply_builder_interface.h37
-rw-r--r--src/mongo/s/commands/cluster_explain_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp2
-rw-r--r--src/mongo/s/commands/strategy.cpp32
23 files changed, 216 insertions, 149 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 527e2ce7af6..67d31359f66 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -594,6 +594,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/idl/idl_parser',
+ '$BUILD_DIR/mongo/rpc/rpc',
"commands",
"auth/authmocks",
"repl/replmocks",
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index 87f02341918..b020d29eee0 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -50,6 +50,9 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/server_parameters.h"
+#include "mongo/rpc/factory.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
+#include "mongo/rpc/protocol.h"
#include "mongo/rpc/write_concern_error_detail.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/invariant.h"
@@ -109,22 +112,21 @@ bool checkAuthorizationImplPreParse(OperationContext* opCtx,
BSONObj CommandHelpers::runCommandDirectly(OperationContext* opCtx, const OpMsgRequest& request) {
auto command = globalCommandRegistry()->findCommand(request.getCommandName());
invariant(command);
- BufBuilder bb;
- CommandReplyBuilder crb(BSONObjBuilder{bb});
+ rpc::OpMsgReplyBuilder replyBuilder;
try {
auto invocation = command->parse(opCtx, request);
- invocation->run(opCtx, &crb);
- auto body = crb.getBodyBuilder();
+ invocation->run(opCtx, &replyBuilder);
+ auto body = replyBuilder.getBodyBuilder();
CommandHelpers::extractOrAppendOk(body);
} catch (const StaleConfigException&) {
// These exceptions are intended to be handled at a higher level.
throw;
} catch (const DBException& ex) {
- auto body = crb.getBodyBuilder();
+ auto body = replyBuilder.getBodyBuilder();
body.resetToEmpty();
appendCommandStatusNoThrow(body, ex.toStatus());
}
- return BSONObj(bb.release());
+ return replyBuilder.releaseBody();
}
void CommandHelpers::auditLogAuthEvent(OperationContext* opCtx,
@@ -380,24 +382,6 @@ bool CommandHelpers::uassertShouldAttemptParse(OperationContext* opCtx,
constexpr StringData CommandHelpers::kHelpFieldName;
//////////////////////////////////////////////////////////////
-// CommandReplyBuilder
-
-CommandReplyBuilder::CommandReplyBuilder(BSONObjBuilder bodyObj)
- : _bodyBuf(&bodyObj.bb()), _bodyOffset(bodyObj.offset()) {
- // CommandReplyBuilder requires that bodyObj build into an externally-owned buffer.
- invariant(!bodyObj.owned());
- bodyObj.doneFast();
-}
-
-BSONObjBuilder CommandReplyBuilder::getBodyBuilder() const {
- return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, *_bodyBuf, _bodyOffset);
-}
-
-void CommandReplyBuilder::reset() {
- getBodyBuilder().resetToEmpty();
-}
-
-//////////////////////////////////////////////////////////////
// CommandInvocation
CommandInvocation::~CommandInvocation() = default;
@@ -443,7 +427,7 @@ public:
_dbName(_request->getDatabase().toString()) {}
private:
- void run(OperationContext* opCtx, CommandReplyBuilder* result) override {
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override {
try {
BSONObjBuilder bob = result->getBodyBuilder();
bool ok = _command->run(opCtx, _dbName, _request->body, bob);
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index 966a0d640c7..5907fb65990 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -414,52 +414,6 @@ private:
ServerStatusMetricField<Counter64> _commandsFailedMetric;
};
-class CommandReplyBuilder {
-public:
- explicit CommandReplyBuilder(BSONObjBuilder bodyObj);
-
- CommandReplyBuilder(const CommandReplyBuilder&) = delete;
- CommandReplyBuilder& operator=(const CommandReplyBuilder&) = delete;
-
- /**
- * Returns a BSONObjBuilder that can be used to build the reply in-place. The returned
- * builder (or an object into which it has been moved) must be completed before calling
- * any more methods on this object. A builder is completed by a call to `done()` or by
- * its destructor. Can be called repeatedly to append multiple things to the reply, as
- * long as each returned builder must be completed between calls.
- */
- BSONObjBuilder getBodyBuilder() const;
-
- void reset();
-
- /**
- * Appends a key:object field to this reply.
- */
- template <typename T>
- void append(StringData key, const T& object) {
- getBodyBuilder() << key << object;
- }
-
- /**
- * The specified 'object' must be BSON-serializable.
- *
- * BSONSerializable 'x' means 'x.serialize(bob)' appends a representation of 'x'
- * into 'BSONObjBuilder* bob'.
- */
- template <typename T>
- void fillFrom(const T& object) {
- static_assert(!isStatusOrStatusWith<std::decay_t<T>>,
- "Status and StatusWith<T> aren't supported by TypedCommand and fillFrom(). "
- "Use uassertStatusOK() instead.");
- auto bob = getBodyBuilder();
- object.serialize(&bob);
- }
-
-private:
- BufBuilder* const _bodyBuf;
- const std::size_t _bodyOffset;
-};
-
/**
* Represents a single invocation of a given command.
*/
@@ -476,7 +430,7 @@ public:
* indicated either by throwing (preferred), or by calling
* `CommandHelpers::extractOrAppendOk`.
*/
- virtual void run(OperationContext* opCtx, CommandReplyBuilder* result) = 0;
+ virtual void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) = 0;
virtual void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
@@ -808,14 +762,14 @@ private:
decltype(auto) _callTypedRun(OperationContext* opCtx) {
return static_cast<Invocation*>(this)->typedRun(opCtx);
}
- void _runImpl(std::true_type, OperationContext* opCtx, CommandReplyBuilder*) {
+ void _runImpl(std::true_type, OperationContext* opCtx, rpc::ReplyBuilderInterface*) {
_callTypedRun(opCtx);
}
- void _runImpl(std::false_type, OperationContext* opCtx, CommandReplyBuilder* reply) {
+ void _runImpl(std::false_type, OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) {
reply->fillFrom(_callTypedRun(opCtx));
}
- void run(OperationContext* opCtx, CommandReplyBuilder* reply) final {
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) final {
using VoidResultTag = std::is_void<decltype(_callTypedRun(opCtx))>;
_runImpl(VoidResultTag{}, opCtx, reply);
}
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 7f16897f022..908214a6e2f 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -109,6 +109,7 @@ env.Library(
'$BUILD_DIR/mongo/db/mongohasher',
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/logger/parse_log_component_settings',
+ '$BUILD_DIR/mongo/rpc/protocol',
],
)
diff --git a/src/mongo/db/commands/explain_cmd.cpp b/src/mongo/db/commands/explain_cmd.cpp
index 003047168db..46b0c6eac3f 100644
--- a/src/mongo/db/commands/explain_cmd.cpp
+++ b/src/mongo/db/commands/explain_cmd.cpp
@@ -92,7 +92,7 @@ public:
_innerRequest{std::move(innerRequest)},
_innerInvocation{std::move(innerInvocation)} {}
- void run(OperationContext* opCtx, CommandReplyBuilder* result) override {
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override {
uassert(50746,
"Explain's child command cannot run on this node. "
"Are you explaining a write command on a secondary?",
diff --git a/src/mongo/db/commands/generic.cpp b/src/mongo/db/commands/generic.cpp
index 277b22aba99..a572b94747c 100644
--- a/src/mongo/db/commands/generic.cpp
+++ b/src/mongo/db/commands/generic.cpp
@@ -108,8 +108,16 @@ public:
return NamespaceString(request().request.getDatabase());
}
- void run(OperationContext* opCtx, CommandReplyBuilder* result) override {
- result->append("echo", request().request.body);
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override {
+ auto sequences = request().request.sequences;
+ for (auto& docSeq : sequences) {
+ auto docBuilder = result->getDocSequenceBuilder(docSeq.name);
+ for (auto& bson : docSeq.objs) {
+ docBuilder.append(bson);
+ }
+ }
+
+ result->getBodyBuilder().append("echo", request().request.body);
}
};
diff --git a/src/mongo/db/commands/mr_test.cpp b/src/mongo/db/commands/mr_test.cpp
index f8c81c473a5..6a1f5d67302 100644
--- a/src/mongo/db/commands/mr_test.cpp
+++ b/src/mongo/db/commands/mr_test.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/scripting/dbdirectclient_factory.h"
#include "mongo/scripting/engine.h"
@@ -425,10 +426,9 @@ Status MapReduceCommandTest::_runCommand(StringData mapCode, StringData reduceCo
ASSERT(command) << "Unable to look up mapReduce command";
auto request = OpMsgRequest::fromDBAndBody(inputNss.db(), _makeCmdObj(mapCode, reduceCode));
- BufBuilder bb;
- CommandReplyBuilder crb(BSONObjBuilder{bb});
- command->parse(_opCtx.get(), request)->run(_opCtx.get(), &crb);
- auto status = getStatusFromCommandResult(crb.getBodyBuilder().asTempObj());
+ auto replyBuilder = rpc::makeReplyBuilder(rpc::Protocol::kOpMsg);
+ command->parse(_opCtx.get(), request)->run(_opCtx.get(), replyBuilder.get());
+ auto status = getStatusFromCommandResult(replyBuilder->getBodyBuilder().asTempObj());
if (!status.isOK()) {
return status.withContext(str::stream() << "mapReduce command failed: " << request.body);
}
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index edbd9a6d315..46ebb7d0e9c 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -225,7 +225,7 @@ private:
// Customization point for 'run'.
virtual void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const = 0;
- void run(OperationContext* opCtx, CommandReplyBuilder* result) final {
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) final {
try {
try {
_transactionChecks(opCtx);
diff --git a/src/mongo/db/commands_test.cpp b/src/mongo/db/commands_test.cpp
index bf03dd61a19..c92f72617c3 100644
--- a/src/mongo/db/commands_test.cpp
+++ b/src/mongo/db/commands_test.cpp
@@ -33,6 +33,8 @@
#include "mongo/db/commands_test_example_gen.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/service_context_test_fixture.h"
+#include "mongo/rpc/factory.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -203,7 +205,7 @@ public:
/**
* Reply with an incremented 'request.i'.
*/
- void run(OperationContext* opCtx, CommandReplyBuilder* reply) override {
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override {
commands_test_example::ExampleIncrementReply r;
r.setIPlusOne(request().getI() + 1);
reply->fillFrom(r);
@@ -345,17 +347,16 @@ protected:
ASSERT_EQ(invocation->ns(), ns);
const BSONObj reply = [&] {
- BufBuilder bb;
- CommandReplyBuilder crb{BSONObjBuilder{bb}};
+ rpc::OpMsgReplyBuilder replyBuilder;
try {
- invocation->run(opCtx.get(), &crb);
- auto bob = crb.getBodyBuilder();
+ invocation->run(opCtx.get(), &replyBuilder);
+ auto bob = replyBuilder.getBodyBuilder();
CommandHelpers::extractOrAppendOk(bob);
} catch (const DBException& e) {
- auto bob = crb.getBodyBuilder();
+ auto bob = replyBuilder.getBodyBuilder();
CommandHelpers::appendCommandStatusNoThrow(bob, e.toStatus());
}
- return BSONObj(bb.release());
+ return replyBuilder.releaseBody();
}();
postAssert(i, reply);
diff --git a/src/mongo/db/s/get_database_version_command.cpp b/src/mongo/db/s/get_database_version_command.cpp
index 8d7f3446db6..b6925c705f2 100644
--- a/src/mongo/db/s/get_database_version_command.cpp
+++ b/src/mongo/db/s/get_database_version_command.cpp
@@ -72,7 +72,7 @@ public:
ActionType::getDatabaseVersion));
}
- void run(OperationContext* opCtx, CommandReplyBuilder* result) override {
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override {
uassert(ErrorCodes::IllegalOperation,
str::stream() << definition()->getName() << " can only be run on shard servers",
serverGlobalParams.clusterRole == ClusterRole::ShardServer);
@@ -83,7 +83,7 @@ public:
versionObj = dbVersion->toBSON();
}
}
- result->append("dbVersion", versionObj);
+ result->getBodyBuilder().append("dbVersion", versionObj);
}
StringData _targetDb() const {
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 11a73d8b421..287be590179 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -460,7 +460,7 @@ void appendClusterAndOperationTime(OperationContext* opCtx,
void invokeInTransaction(OperationContext* opCtx,
CommandInvocation* invocation,
- CommandReplyBuilder* replyBuilder) {
+ rpc::ReplyBuilderInterface* replyBuilder) {
auto session = OperationContextSession::get(opCtx);
if (!session) {
// Run the command directly if we're not in a transaction.
@@ -495,7 +495,6 @@ bool runCommandImpl(OperationContext* opCtx,
const boost::optional<OperationSessionInfoFromClient>& sessionOptions) {
const Command* command = invocation->definition();
auto bytesToReserve = command->reserveBytesForReply();
-
// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the
// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency
// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds.
@@ -503,12 +502,11 @@ bool runCommandImpl(OperationContext* opCtx,
if (kDebugBuild)
bytesToReserve = 0;
#endif
-
- CommandReplyBuilder crb(replyBuilder->getInPlaceReplyBuilder(bytesToReserve));
+ replyBuilder->reserveBytes(bytesToReserve);
if (!invocation->supportsWriteConcern()) {
behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
- invokeInTransaction(opCtx, invocation, &crb);
+ invokeInTransaction(opCtx, invocation, replyBuilder);
} else {
auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body));
auto session = OperationContextSession::get(opCtx);
@@ -539,13 +537,13 @@ bool runCommandImpl(OperationContext* opCtx,
};
try {
- invokeInTransaction(opCtx, invocation, &crb);
+ invokeInTransaction(opCtx, invocation, replyBuilder);
} catch (const DBException&) {
waitForWriteConcern(*extraFieldsBuilder);
throw;
}
- waitForWriteConcern(crb.getBodyBuilder());
+ waitForWriteConcern(replyBuilder->getBodyBuilder());
// Nothing in run() should change the writeConcern.
dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() ==
@@ -555,20 +553,20 @@ bool runCommandImpl(OperationContext* opCtx,
behaviors.waitForLinearizableReadConcern(opCtx);
const bool ok = [&] {
- auto body = crb.getBodyBuilder();
+ auto body = replyBuilder->getBodyBuilder();
return CommandHelpers::extractOrAppendOk(body);
}();
- behaviors.attachCurOpErrInfo(opCtx, crb.getBodyBuilder().asTempObj());
+ behaviors.attachCurOpErrInfo(opCtx, replyBuilder->getBodyBuilder().asTempObj());
if (!ok) {
- auto response = crb.getBodyBuilder().asTempObj();
+ auto response = replyBuilder->getBodyBuilder().asTempObj();
auto codeField = response["code"];
if (codeField.isNumber()) {
auto code = ErrorCodes::Error(codeField.numberInt());
// Append the error labels for transient transaction errors.
auto errorLabels = getErrorLabels(sessionOptions, command->getName(), code);
- crb.getBodyBuilder().appendElements(errorLabels);
+ replyBuilder->getBodyBuilder().appendElements(errorLabels);
}
}
@@ -576,7 +574,7 @@ bool runCommandImpl(OperationContext* opCtx,
appendReplyMetadata(opCtx, request, &metadataBob);
{
- auto commandBodyBob = crb.getBodyBuilder();
+ auto commandBodyBob = replyBuilder->getBodyBuilder();
appendClusterAndOperationTime(opCtx, &commandBodyBob, &metadataBob, startOperationTime);
}
diff --git a/src/mongo/rpc/command_reply_builder.cpp b/src/mongo/rpc/command_reply_builder.cpp
index 534ad779f86..ba62df3e72e 100644
--- a/src/mongo/rpc/command_reply_builder.cpp
+++ b/src/mongo/rpc/command_reply_builder.cpp
@@ -53,14 +53,16 @@ CommandReplyBuilder& CommandReplyBuilder::setRawCommandReply(const BSONObj& comm
return *this;
}
-BSONObjBuilder CommandReplyBuilder::getInPlaceReplyBuilder(std::size_t reserveBytes) {
+BSONObjBuilder CommandReplyBuilder::getBodyBuilder() {
+ if (_state == State::kMetadata) {
+ invariant(_bodyOffset);
+ return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, _builder, _bodyOffset);
+ }
invariant(_state == State::kCommandReply);
- // Eagerly allocate reserveBytes bytes.
- _builder.reserveBytes(reserveBytes);
- // Claim our reservation immediately so we can actually write data to it.
- _builder.claimReservedBytes(reserveBytes);
_state = State::kMetadata;
- return BSONObjBuilder(_builder);
+ auto bob = BSONObjBuilder(_builder);
+ _bodyOffset = bob.offset();
+ return bob;
}
CommandReplyBuilder& CommandReplyBuilder::setMetadata(const BSONObj& metadata) {
@@ -94,6 +96,7 @@ void CommandReplyBuilder::reset() {
_builder.skip(mongo::MsgData::MsgDataHeaderSize);
_message.reset();
_state = State::kCommandReply;
+ _bodyOffset = 0;
}
Message CommandReplyBuilder::done() {
diff --git a/src/mongo/rpc/command_reply_builder.h b/src/mongo/rpc/command_reply_builder.h
index cce27f90b01..d3d741d678f 100644
--- a/src/mongo/rpc/command_reply_builder.h
+++ b/src/mongo/rpc/command_reply_builder.h
@@ -57,7 +57,7 @@ public:
CommandReplyBuilder& setRawCommandReply(const BSONObj& commandReply) final;
- BSONObjBuilder getInPlaceReplyBuilder(std::size_t) final;
+ BSONObjBuilder getBodyBuilder() final;
CommandReplyBuilder& setMetadata(const BSONObj& metadata) final;
@@ -72,10 +72,16 @@ public:
*/
Message done() final;
+ void reserveBytes(const std::size_t bytes) final {
+ _builder.reserveBytes(bytes);
+ _builder.claimReservedBytes(bytes);
+ }
+
private:
enum class State { kMetadata, kCommandReply, kOutputDocs, kDone };
// Default values are all empty.
+ std::size_t _bodyOffset = 0;
BufBuilder _builder{};
Message _message;
State _state{State::kCommandReply};
diff --git a/src/mongo/rpc/legacy_reply_builder.cpp b/src/mongo/rpc/legacy_reply_builder.cpp
index 21e18cde76a..78663e9f75f 100644
--- a/src/mongo/rpc/legacy_reply_builder.cpp
+++ b/src/mongo/rpc/legacy_reply_builder.cpp
@@ -83,14 +83,17 @@ LegacyReplyBuilder& LegacyReplyBuilder::setRawCommandReply(const BSONObj& comman
return *this;
}
-BSONObjBuilder LegacyReplyBuilder::getInPlaceReplyBuilder(std::size_t reserveBytes) {
- invariant(_state == State::kCommandReply);
- // Eagerly allocate reserveBytes bytes.
- _builder.reserveBytes(reserveBytes);
- // Claim our reservation immediately so we can actually write data to it.
- _builder.claimReservedBytes(reserveBytes);
- _state = State::kMetadata;
- return BSONObjBuilder(_builder);
+BSONObjBuilder LegacyReplyBuilder::getBodyBuilder() {
+ if (_state == State::kCommandReply) {
+ auto bob = BSONObjBuilder(_builder);
+ _bodyOffset = bob.offset();
+ _state = State::kMetadata;
+ return bob;
+ }
+
+ invariant(_state == State::kMetadata);
+ invariant(_bodyOffset);
+ return BSONObjBuilder(BSONObjBuilder::ResumeBuildingTag{}, _builder, _bodyOffset);
}
LegacyReplyBuilder& LegacyReplyBuilder::setMetadata(const BSONObj& metadata) {
@@ -105,6 +108,11 @@ Protocol LegacyReplyBuilder::getProtocol() const {
return rpc::Protocol::kOpQuery;
}
+void LegacyReplyBuilder::reserveBytes(const std::size_t bytes) {
+ _builder.reserveBytes(bytes);
+ _builder.claimReservedBytes(bytes);
+}
+
void LegacyReplyBuilder::reset() {
// If we are in State::kMetadata, we are already in the 'start' state, so by
// immediately returning, we save a heap allocation.
@@ -116,6 +124,7 @@ void LegacyReplyBuilder::reset() {
_message.reset();
_state = State::kCommandReply;
_staleConfigError = false;
+ _bodyOffset = 0;
}
diff --git a/src/mongo/rpc/legacy_reply_builder.h b/src/mongo/rpc/legacy_reply_builder.h
index 22f40092df6..4f3128bb2be 100644
--- a/src/mongo/rpc/legacy_reply_builder.h
+++ b/src/mongo/rpc/legacy_reply_builder.h
@@ -52,7 +52,7 @@ public:
LegacyReplyBuilder& setCommandReply(Status nonOKStatus, BSONObj extraErrorInfo) final;
LegacyReplyBuilder& setRawCommandReply(const BSONObj& commandReply) final;
- BSONObjBuilder getInPlaceReplyBuilder(std::size_t) final;
+ BSONObjBuilder getBodyBuilder() final;
LegacyReplyBuilder& setMetadata(const BSONObj& metadata) final;
@@ -62,10 +62,13 @@ public:
Protocol getProtocol() const final;
+ void reserveBytes(const std::size_t bytes) final;
+
private:
enum class State { kMetadata, kCommandReply, kOutputDocs, kDone };
BufBuilder _builder{};
+ std::size_t _bodyOffset = 0;
Message _message;
State _state{State::kCommandReply};
// For stale config errors we need to set the correct ResultFlag.
diff --git a/src/mongo/rpc/op_msg.cpp b/src/mongo/rpc/op_msg.cpp
index f886027e192..9199693b0e0 100644
--- a/src/mongo/rpc/op_msg.cpp
+++ b/src/mongo/rpc/op_msg.cpp
@@ -247,4 +247,15 @@ Message OpMsgBuilder::finish() {
return Message(_buf.release());
}
+BSONObj OpMsgBuilder::releaseBody() {
+ invariant(_state == kBody);
+ invariant(_bodyStart);
+ invariant(_bodyStart == sizeof(MSGHEADER::Layout) + 4 /*flags*/ + 1 /*body kind byte*/);
+ invariant(!_openBuilder);
+ _state = kDone;
+
+ auto bson = BSONObj(_buf.buf() + _bodyStart);
+ return bson.shareOwnershipWith(_buf.release());
+}
+
} // namespace mongo
diff --git a/src/mongo/rpc/op_msg.h b/src/mongo/rpc/op_msg.h
index ed7f1993d86..dbe6a1506ef 100644
--- a/src/mongo/rpc/op_msg.h
+++ b/src/mongo/rpc/op_msg.h
@@ -220,6 +220,26 @@ public:
*/
static AtomicBool disableDupeFieldCheck_forTest;
+ /**
+ * Similar to finish, any calls on this object after are illegal.
+ */
+ BSONObj releaseBody();
+
+ /**
+ * Returns whether or not this builder is already building a body.
+ */
+ bool isBuildingBody() {
+ return _state == kBody;
+ }
+
+ /**
+ * Reserves and claims the bytes requested in the internal BufBuilder.
+ */
+ void reserveBytes(const std::size_t bytes) {
+ _buf.reserveBytes(bytes);
+ _buf.claimReservedBytes(bytes);
+ }
+
private:
friend class DocSequenceBuilder;
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp
index ab69b57b206..e0bdb01bef6 100644
--- a/src/mongo/rpc/op_msg_integration_test.cpp
+++ b/src/mongo/rpc/op_msg_integration_test.cpp
@@ -165,4 +165,36 @@ TEST(OpMsg, CloseConnectionOnFireAndForgetNotMasterError) {
ASSERT(foundSecondary);
}
+TEST(OpMsg, DocumentSequenceReturnsWork) {
+ std::string errMsg;
+ auto conn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, conn);
+
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", BSON("echo" << 1));
+ opMsgRequest.sequences.push_back({"example", {BSON("a" << 1), BSON("b" << 2)}});
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+
+ auto opMsgReply = OpMsg::parse(reply);
+ ASSERT_EQ(opMsgReply.sequences.size(), 1u);
+
+ auto sequence = opMsgReply.getSequence("example");
+ ASSERT(sequence);
+ ASSERT_EQ(sequence->objs.size(), 2u);
+
+ auto checkSequence = [](auto& bson, auto key, auto val) {
+ ASSERT(bson.hasField(key));
+ ASSERT_EQ(bson[key].Int(), val);
+ };
+ checkSequence(sequence->objs[0], "a", 1);
+ checkSequence(sequence->objs[1], "b", 2);
+
+ ASSERT_BSONOBJ_EQ(opMsgReply.body.getObjectField("echo"),
+ BSON("echo" << 1 << "$db"
+ << "admin"));
+}
+
} // namespace mongo
diff --git a/src/mongo/rpc/op_msg_rpc_impls.h b/src/mongo/rpc/op_msg_rpc_impls.h
index 4725456630f..b16a9a7c0cd 100644
--- a/src/mongo/rpc/op_msg_rpc_impls.h
+++ b/src/mongo/rpc/op_msg_rpc_impls.h
@@ -59,13 +59,14 @@ public:
_builder.beginBody().appendElements(reply);
return *this;
}
- BSONObjBuilder getInPlaceReplyBuilder(std::size_t reserveBytes) override {
- BSONObjBuilder bob = _builder.beginBody();
- // Eagerly reserve space and claim our reservation immediately so we can actually write data
- // to it.
- bob.bb().reserveBytes(reserveBytes);
- bob.bb().claimReservedBytes(reserveBytes);
- return bob;
+ BSONObjBuilder getBodyBuilder() override {
+ if (!_builder.isBuildingBody()) {
+ return _builder.beginBody();
+ }
+ return _builder.resumeBody();
+ }
+ OpMsgBuilder::DocSequenceBuilder getDocSequenceBuilder(StringData name) override {
+ return _builder.beginDocSequence(name);
}
ReplyBuilderInterface& setMetadata(const BSONObj& metadata) override {
_builder.resumeBody().appendElements(metadata);
@@ -80,6 +81,12 @@ public:
Message done() override {
return _builder.finish();
}
+ void reserveBytes(const std::size_t bytes) override {
+ _builder.reserveBytes(bytes);
+ }
+ BSONObj releaseBody() {
+ return _builder.releaseBody();
+ }
private:
OpMsgBuilder _builder;
diff --git a/src/mongo/rpc/reply_builder_interface.h b/src/mongo/rpc/reply_builder_interface.h
index a157f41167e..70524fc3a64 100644
--- a/src/mongo/rpc/reply_builder_interface.h
+++ b/src/mongo/rpc/reply_builder_interface.h
@@ -33,6 +33,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
#include "mongo/bson/util/builder.h"
+#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/protocol.h"
namespace mongo {
@@ -58,9 +59,21 @@ public:
virtual ReplyBuilderInterface& setRawCommandReply(const BSONObj& reply) = 0;
/**
- * Returns a BSONObjBuilder for building a command reply in place.
+ * Returns a BSONObjBuilder that can be used to build the reply in-place. The returned
+ * builder (or an object into which it has been moved) must be completed before calling
+ * any more methods on this object. A builder is completed by a call to `done()` or by
+ * its destructor. Can be called repeatedly to append multiple things to the reply, as
+ * long as each returned builder is completed between calls.
*/
- virtual BSONObjBuilder getInPlaceReplyBuilder(std::size_t reserveBytes) = 0;
+ virtual BSONObjBuilder getBodyBuilder() = 0;
+
+ /**
+ * Returns a DocSeqBuilder for building a command reply in place. This should only be called
+ * before the body as the body will have status types appended at the end.
+ */
+ virtual OpMsgBuilder::DocSequenceBuilder getDocSequenceBuilder(StringData name) {
+ uasserted(99980, "Only OpMsg may use document sequences");
+ }
virtual ReplyBuilderInterface& setMetadata(const BSONObj& metadata) = 0;
@@ -104,6 +117,26 @@ public:
*/
virtual Message done() = 0;
+ /**
+ * The specified 'object' must be BSON-serializable.
+ *
+ * BSONSerializable 'x' means 'x.serialize(bob)' appends a representation of 'x'
+ * into 'BSONObjBuilder* bob'.
+ */
+ template <typename T>
+ void fillFrom(const T& object) {
+ static_assert(!isStatusOrStatusWith<std::decay_t<T>>,
+ "Status and StatusWith<T> aren't supported by TypedCommand and fillFrom(). "
+ "Use uassertStatusOK() instead.");
+ auto bob = getBodyBuilder();
+ object.serialize(&bob);
+ }
+
+ /**
+ * Reserves and claims bytes for the Message generated by this interface.
+ */
+ virtual void reserveBytes(const std::size_t bytes) = 0;
+
protected:
ReplyBuilderInterface() = default;
};
diff --git a/src/mongo/s/commands/cluster_explain_cmd.cpp b/src/mongo/s/commands/cluster_explain_cmd.cpp
index ff7fc38f7be..e007b7bab8c 100644
--- a/src/mongo/s/commands/cluster_explain_cmd.cpp
+++ b/src/mongo/s/commands/cluster_explain_cmd.cpp
@@ -95,7 +95,7 @@ public:
_innerInvocation{std::move(innerInvocation)} {}
private:
- void run(OperationContext* opCtx, CommandReplyBuilder* result) override {
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override {
try {
auto bob = result->getBodyBuilder();
_innerInvocation->explain(opCtx, _verbosity, &bob);
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index d97f24b34bb..412f64c9885 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -302,7 +302,7 @@ private:
return response.getOk();
}
- void run(OperationContext* opCtx, CommandReplyBuilder* result) override {
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override {
try {
BSONObjBuilder bob = result->getBodyBuilder();
bool ok = runImpl(opCtx, *_request, _batchedRequest, bob);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 5b82fe3a055..ddd4ce23f7e 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -62,6 +62,7 @@
#include "mongo/rpc/metadata/logical_time_metadata.h"
#include "mongo/rpc/metadata/tracking_metadata.h"
#include "mongo/rpc/op_msg.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/cannot_implicitly_create_collection_info.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/parallel.h"
@@ -151,7 +152,7 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res
void execCommandClient(OperationContext* opCtx,
CommandInvocation* invocation,
const OpMsgRequest& request,
- CommandReplyBuilder* result) {
+ rpc::ReplyBuilderInterface* result) {
const Command* c = invocation->definition();
ON_BLOCK_EXIT([opCtx, &result] {
auto body = result->getBodyBuilder();
@@ -289,10 +290,11 @@ MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError);
void runCommand(OperationContext* opCtx,
const OpMsgRequest& request,
const NetworkOp opType,
- BSONObjBuilder&& builder) {
+ rpc::ReplyBuilderInterface* replyBuilder) {
auto const commandName = request.getCommandName();
auto const command = CommandHelpers::findCommand(commandName);
if (!command) {
+ auto builder = replyBuilder->getBodyBuilder();
ON_BLOCK_EXIT([opCtx, &builder] { appendRequiredFieldsToResponse(opCtx, &builder); });
CommandHelpers::appendCommandStatusNoThrow(
builder,
@@ -355,12 +357,11 @@ void runCommand(OperationContext* opCtx,
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
auto readConcernParseStatus = readConcernArgs.initialize(request.body);
if (!readConcernParseStatus.isOK()) {
+ auto builder = replyBuilder->getBodyBuilder();
CommandHelpers::appendCommandStatusNoThrow(builder, readConcernParseStatus);
return;
}
- CommandReplyBuilder crb(std::move(builder));
-
try {
for (int tries = 0;; ++tries) {
// Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown.
@@ -374,9 +375,9 @@ void runCommand(OperationContext* opCtx,
"unexpected change of namespace when retrying");
}
- crb.reset();
+ replyBuilder->reset();
try {
- execCommandClient(opCtx, invocation.get(), request, &crb);
+ execCommandClient(opCtx, invocation.get(), request, replyBuilder);
return;
} catch (const ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) {
const auto staleNs = [&] {
@@ -421,12 +422,8 @@ void runCommand(OperationContext* opCtx,
}
} catch (const DBException& e) {
command->incrementCommandsFailed();
- CurOp::get(opCtx)->debug().errInfo = e.toStatus();
LastError::get(opCtx->getClient()).setLastError(e.code(), e.reason());
- crb.reset();
- BSONObjBuilder bob = crb.getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(bob, e.toStatus());
- appendRequiredFieldsToResponse(opCtx, &bob);
+ throw;
}
}
@@ -558,7 +555,7 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
std::string db = request.getDatabase().toString();
try {
LOG(3) << "Command begin db: " << db << " msg id: " << m.header().getId();
- runCommand(opCtx, request, m.operation(), reply->getInPlaceReplyBuilder(0));
+ runCommand(opCtx, request, m.operation(), reply.get());
LOG(3) << "Command end db: " << db << " msg id: " << m.header().getId();
} catch (const DBException& ex) {
LOG(1) << "Exception thrown while processing command on " << db
@@ -573,7 +570,7 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
throw;
}
reply->reset();
- auto bob = reply->getInPlaceReplyBuilder(0);
+ auto bob = reply->getBodyBuilder();
CommandHelpers::appendCommandStatusNoThrow(bob, ex.toStatus());
appendRequiredFieldsToResponse(opCtx, &bob);
}
@@ -719,11 +716,10 @@ void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) {
}
void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) {
- BufBuilder bb;
+ const auto& msg = dbm->msg();
+ rpc::OpMsgReplyBuilder reply;
runCommand(opCtx,
[&]() {
- const auto& msg = dbm->msg();
-
switch (msg.operation()) {
case dbInsert: {
return InsertOp::parseLegacy(msg).serialize({});
@@ -738,8 +734,8 @@ void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) {
MONGO_UNREACHABLE;
}
}(),
- dbm->msg().operation(),
- BSONObjBuilder{bb}); // built object is ignored
+ msg.operation(),
+ &reply); // built object is ignored
}
void Strategy::explainFind(OperationContext* opCtx,