summaryrefslogtreecommitdiff
path: root/src/mongo/rpc/legacy_reply_builder.cpp
diff options
context:
space:
mode:
authorAdam Midvidy <amidvidy@gmail.com>2015-11-12 00:37:49 -0500
committerAdam Midvidy <amidvidy@gmail.com>2015-11-13 10:50:32 -0500
commit289d3fdf5c9d5b232b383c6cd871de1b75cf76b4 (patch)
tree58bcb98e92fb61e805674afbfb6db775959889dd /src/mongo/rpc/legacy_reply_builder.cpp
parentd7511a9244ecab9ea894bb4e8767a289314a1e34 (diff)
downloadmongo-289d3fdf5c9d5b232b383c6cd871de1b75cf76b4.tar.gz
SERVER-20884 build command replies in-place to avoid copies
Diffstat (limited to 'src/mongo/rpc/legacy_reply_builder.cpp')
-rw-r--r--src/mongo/rpc/legacy_reply_builder.cpp179
1 files changed, 35 insertions, 144 deletions
diff --git a/src/mongo/rpc/legacy_reply_builder.cpp b/src/mongo/rpc/legacy_reply_builder.cpp
index e4830c10f2a..efa50d8f82c 100644
--- a/src/mongo/rpc/legacy_reply_builder.cpp
+++ b/src/mongo/rpc/legacy_reply_builder.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/jsobj.h"
#include "mongo/rpc/metadata.h"
+#include "mongo/rpc/metadata/sharding_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
@@ -42,111 +43,54 @@
namespace mongo {
namespace rpc {
-namespace {
-// Margin of error for availableBytes size estimate
-std::size_t kReservedSize = 1024;
-
-bool isEmptyCommandReply(const BSONObj& bson) {
- auto cursorElem = bson[LegacyReplyBuilder::kCursorTag];
- if (cursorElem.eoo())
- return true;
-
- BSONObj cursorObj = cursorElem.Obj();
- auto firstBatchElem = cursorObj[LegacyReplyBuilder::kFirstBatchTag];
- if (firstBatchElem.eoo())
- return true;
-
- BSONObjIterator it(firstBatchElem.Obj());
-
- return !it.more();
-}
-} // namespace
-
-const char LegacyReplyBuilder::kCursorTag[] = "cursor";
-const char LegacyReplyBuilder::kFirstBatchTag[] = "firstBatch";
-
LegacyReplyBuilder::LegacyReplyBuilder() : LegacyReplyBuilder(Message()) {}
-LegacyReplyBuilder::LegacyReplyBuilder(Message&& message)
- : _currentLength{kReservedSize}, _message{std::move(message)} {}
+LegacyReplyBuilder::LegacyReplyBuilder(Message&& message) : _message{std::move(message)} {
+ _builder.skip(sizeof(QueryResult::Value));
+}
LegacyReplyBuilder::~LegacyReplyBuilder() {}
-LegacyReplyBuilder& LegacyReplyBuilder::setMetadata(const BSONObj& metadata) {
- invariant(_state == State::kMetadata);
-
- auto dataSize = static_cast<std::size_t>(metadata.objsize());
-
- _currentLength += dataSize;
- _metadata = metadata.getOwned();
- _state = State::kCommandReply;
+LegacyReplyBuilder& LegacyReplyBuilder::setRawCommandReply(const BSONObj& commandReply) {
+ invariant(_state == State::kCommandReply);
+ commandReply.appendSelfToBufBuilder(_builder);
+ _state = State::kMetadata;
return *this;
}
-LegacyReplyBuilder& LegacyReplyBuilder::setRawCommandReply(const BSONObj& commandReply) {
+BufBuilder& LegacyReplyBuilder::getInPlaceReplyBuilder() {
invariant(_state == State::kCommandReply);
+ _state = State::kMetadata;
+ return _builder;
+}
- auto dataSize = static_cast<std::size_t>(commandReply.objsize());
-
- _currentLength += dataSize;
- _commandReply = commandReply.getOwned();
- _allowAddingOutputDocs = isEmptyCommandReply(_commandReply);
-
+LegacyReplyBuilder& LegacyReplyBuilder::setMetadata(const BSONObj& metadata) {
+ invariant(_state == State::kMetadata);
+ // HACK: the only thing we need to downconvert is ShardingMetadata, which can go at the end of
+ // the object. So we do that in place to avoid copying the command reply.
+ auto shardingMetadata = rpc::ShardingMetadata::readFromMetadata(metadata);
+ invariant(shardingMetadata.isOK() || shardingMetadata.getStatus() == ErrorCodes::NoSuchKey);
+
+ if (shardingMetadata.isOK()) {
+ // Write the sharding metadata in to the end of the object. The third parameter is needed
+ // because we already have skipped some bytes for the message header.
+ BSONObjBuilder resumedBuilder(
+ BSONObjBuilder::ResumeBuildingTag(), _builder, sizeof(QueryResult::Value));
+ shardingMetadata.getValue().writeToMetadata(&resumedBuilder);
+ }
_state = State::kOutputDocs;
return *this;
}
Status LegacyReplyBuilder::addOutputDocs(DocumentRange docs) {
invariant(_state == State::kOutputDocs);
- invariant(_allowAddingOutputDocs);
-
- auto dataSize = docs.data().length();
-
- auto hasSpace = _hasSpaceFor(dataSize);
- if (!hasSpace.isOK()) {
- return hasSpace;
- }
-
- // The temporary obj is used to address the case when where is not enough space.
- // BSONArray overhead can not be estimated upfront.
- std::vector<BSONObj> docsTmp{};
- std::size_t lenTmp = 0;
- std::size_t tmpIndex(_currentIndex);
- for (auto&& it : docs) {
- docsTmp.emplace_back(it.getOwned());
- lenTmp += BSONObjBuilder::numStr(++tmpIndex).length() + 2; // space for storing array index
- }
-
- hasSpace = _hasSpaceFor(dataSize + lenTmp);
- if (!hasSpace.isOK()) {
- return hasSpace;
- }
-
- // vector::insert instead of swap allows to call addOutputDoc(s) multiple times
- _outputDocs.insert(_outputDocs.end(),
- std::make_move_iterator(docsTmp.begin()),
- std::make_move_iterator(docsTmp.end()));
-
- _currentIndex = tmpIndex;
- _currentLength += lenTmp;
- _currentLength += dataSize;
+ // no op
return Status::OK();
}
Status LegacyReplyBuilder::addOutputDoc(const BSONObj& bson) {
invariant(_state == State::kOutputDocs);
- invariant(_allowAddingOutputDocs);
-
- auto dataSize = static_cast<std::size_t>(bson.objsize());
- auto hasSpace = _hasSpaceFor(dataSize);
- if (!hasSpace.isOK()) {
- return hasSpace;
- }
-
- _outputDocs.emplace_back(bson.getOwned());
- _currentLength += dataSize;
- _currentLength += BSONObjBuilder::numStr(++_currentIndex).length() + 2; // storing array index
-
+ // no op
return Status::OK();
}
@@ -161,87 +105,34 @@ Protocol LegacyReplyBuilder::getProtocol() const {
void LegacyReplyBuilder::reset() {
// If we are in State::kMetadata, we are already in the 'start' state, so by
// immediately returning, we save a heap allocation.
- if (_state == State::kMetadata) {
+ if (_state == State::kCommandReply) {
return;
}
+ _builder.reset();
+ _builder.skip(sizeof(QueryResult::Value));
_message.reset();
- _currentLength = kReservedSize;
- _currentIndex = 0U;
- _state = State::kMetadata;
+ _state = State::kCommandReply;
}
Message LegacyReplyBuilder::done() {
invariant(_state == State::kOutputDocs);
- BSONObj reply = uassertStatusOK(rpc::downconvertReplyMetadata(_commandReply, _metadata));
-
- BufBuilder bufBuilder;
- bufBuilder.skip(sizeof(QueryResult::Value));
-
- if (_allowAddingOutputDocs) {
- BSONObjBuilder topBuilder(bufBuilder);
- for (const auto& el : reply) {
- if (kCursorTag != el.fieldNameStringData()) {
- topBuilder.append(el);
- continue;
- }
- invariant(el.isABSONObj());
- BSONObjBuilder curBuilder(topBuilder.subobjStart(kCursorTag));
- for (const auto& insideEl : el.Obj()) {
- if (kFirstBatchTag != insideEl.fieldNameStringData()) {
- curBuilder.append(insideEl);
- continue;
- }
- invariant(insideEl.isABSONObj());
- BSONArrayBuilder arrBuilder(curBuilder.subarrayStart(kFirstBatchTag));
- for (const auto& doc : _outputDocs) {
- arrBuilder.append(doc);
- }
- arrBuilder.doneFast();
- }
- curBuilder.doneFast();
- }
- topBuilder.doneFast();
- } else {
- reply.appendSelfToBufBuilder(bufBuilder);
- }
-
- auto msgHeaderSz = static_cast<std::size_t>(MsgData::MsgDataHeaderSize);
-
- invariant(static_cast<std::size_t>(bufBuilder.len()) + msgHeaderSz <=
- mongo::MaxMessageSizeBytes);
-
- QueryResult::View qr = bufBuilder.buf();
+ QueryResult::View qr = _builder.buf();
qr.setResultFlagsToOk();
- qr.msgdata().setLen(bufBuilder.len());
+ qr.msgdata().setLen(_builder.len());
qr.msgdata().setOperation(opReply);
qr.setCursorId(0);
qr.setStartingFrom(0);
qr.setNReturned(1);
_message.setData(qr.view2ptr(), true);
- bufBuilder.decouple();
+ _builder.decouple();
_state = State::kDone;
return std::move(_message);
}
-std::size_t LegacyReplyBuilder::availableBytes() const {
- std::size_t msgHeaderSz = static_cast<std::size_t>(MsgData::MsgDataHeaderSize);
- return mongo::MaxMessageSizeBytes - _currentLength - msgHeaderSz;
-}
-
-Status LegacyReplyBuilder::_hasSpaceFor(std::size_t dataSize) const {
- size_t availBytes = availableBytes();
- if (availBytes < dataSize) {
- return Status(ErrorCodes::Overflow,
- str::stream() << "Not enough space to store " << dataSize << " bytes. Only "
- << availBytes << " bytes are available.");
- }
- return Status::OK();
-}
-
} // namespace rpc
} // namespace mongo