diff options
author | Adam Midvidy <amidvidy@gmail.com> | 2015-11-12 00:37:49 -0500 |
---|---|---|
committer | Adam Midvidy <amidvidy@gmail.com> | 2015-11-13 10:50:32 -0500 |
commit | 289d3fdf5c9d5b232b383c6cd871de1b75cf76b4 (patch) | |
tree | 58bcb98e92fb61e805674afbfb6db775959889dd /src/mongo/rpc/legacy_reply_builder.cpp | |
parent | d7511a9244ecab9ea894bb4e8767a289314a1e34 (diff) | |
download | mongo-289d3fdf5c9d5b232b383c6cd871de1b75cf76b4.tar.gz |
SERVER-20884 build command replies in-place to avoid copies
Diffstat (limited to 'src/mongo/rpc/legacy_reply_builder.cpp')
-rw-r--r-- | src/mongo/rpc/legacy_reply_builder.cpp | 179 |
1 files changed, 35 insertions, 144 deletions
diff --git a/src/mongo/rpc/legacy_reply_builder.cpp b/src/mongo/rpc/legacy_reply_builder.cpp index e4830c10f2a..efa50d8f82c 100644 --- a/src/mongo/rpc/legacy_reply_builder.cpp +++ b/src/mongo/rpc/legacy_reply_builder.cpp @@ -35,6 +35,7 @@ #include "mongo/db/dbmessage.h" #include "mongo/db/jsobj.h" #include "mongo/rpc/metadata.h" +#include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -42,111 +43,54 @@ namespace mongo { namespace rpc { -namespace { -// Margin of error for availableBytes size estimate -std::size_t kReservedSize = 1024; - -bool isEmptyCommandReply(const BSONObj& bson) { - auto cursorElem = bson[LegacyReplyBuilder::kCursorTag]; - if (cursorElem.eoo()) - return true; - - BSONObj cursorObj = cursorElem.Obj(); - auto firstBatchElem = cursorObj[LegacyReplyBuilder::kFirstBatchTag]; - if (firstBatchElem.eoo()) - return true; - - BSONObjIterator it(firstBatchElem.Obj()); - - return !it.more(); -} -} // namespace - -const char LegacyReplyBuilder::kCursorTag[] = "cursor"; -const char LegacyReplyBuilder::kFirstBatchTag[] = "firstBatch"; - LegacyReplyBuilder::LegacyReplyBuilder() : LegacyReplyBuilder(Message()) {} -LegacyReplyBuilder::LegacyReplyBuilder(Message&& message) - : _currentLength{kReservedSize}, _message{std::move(message)} {} +LegacyReplyBuilder::LegacyReplyBuilder(Message&& message) : _message{std::move(message)} { + _builder.skip(sizeof(QueryResult::Value)); +} LegacyReplyBuilder::~LegacyReplyBuilder() {} -LegacyReplyBuilder& LegacyReplyBuilder::setMetadata(const BSONObj& metadata) { - invariant(_state == State::kMetadata); - - auto dataSize = static_cast<std::size_t>(metadata.objsize()); - - _currentLength += dataSize; - _metadata = metadata.getOwned(); - _state = State::kCommandReply; +LegacyReplyBuilder& LegacyReplyBuilder::setRawCommandReply(const BSONObj& commandReply) { + invariant(_state == State::kCommandReply); + commandReply.appendSelfToBufBuilder(_builder); + _state = State::kMetadata; return *this; } -LegacyReplyBuilder& LegacyReplyBuilder::setRawCommandReply(const BSONObj& commandReply) { +BufBuilder& LegacyReplyBuilder::getInPlaceReplyBuilder() { invariant(_state == State::kCommandReply); + _state = State::kMetadata; + return _builder; +} - auto dataSize = static_cast<std::size_t>(commandReply.objsize()); - - _currentLength += dataSize; - _commandReply = commandReply.getOwned(); - _allowAddingOutputDocs = isEmptyCommandReply(_commandReply); - +LegacyReplyBuilder& LegacyReplyBuilder::setMetadata(const BSONObj& metadata) { + invariant(_state == State::kMetadata); + // HACK: the only thing we need to downconvert is ShardingMetadata, which can go at the end of + // the object. So we do that in place to avoid copying the command reply. + auto shardingMetadata = rpc::ShardingMetadata::readFromMetadata(metadata); + invariant(shardingMetadata.isOK() || shardingMetadata.getStatus() == ErrorCodes::NoSuchKey); + + if (shardingMetadata.isOK()) { + // Write the sharding metadata in to the end of the object. The third parameter is needed + // because we already have skipped some bytes for the message header. + BSONObjBuilder resumedBuilder( + BSONObjBuilder::ResumeBuildingTag(), _builder, sizeof(QueryResult::Value)); + shardingMetadata.getValue().writeToMetadata(&resumedBuilder); + } _state = State::kOutputDocs; return *this; } Status LegacyReplyBuilder::addOutputDocs(DocumentRange docs) { invariant(_state == State::kOutputDocs); - invariant(_allowAddingOutputDocs); - - auto dataSize = docs.data().length(); - - auto hasSpace = _hasSpaceFor(dataSize); - if (!hasSpace.isOK()) { - return hasSpace; - } - - // The temporary obj is used to address the case when where is not enough space. - // BSONArray overhead can not be estimated upfront. - std::vector<BSONObj> docsTmp{}; - std::size_t lenTmp = 0; - std::size_t tmpIndex(_currentIndex); - for (auto&& it : docs) { - docsTmp.emplace_back(it.getOwned()); - lenTmp += BSONObjBuilder::numStr(++tmpIndex).length() + 2; // space for storing array index - } - - hasSpace = _hasSpaceFor(dataSize + lenTmp); - if (!hasSpace.isOK()) { - return hasSpace; - } - - // vector::insert instead of swap allows to call addOutputDoc(s) multiple times - _outputDocs.insert(_outputDocs.end(), - std::make_move_iterator(docsTmp.begin()), - std::make_move_iterator(docsTmp.end())); - - _currentIndex = tmpIndex; - _currentLength += lenTmp; - _currentLength += dataSize; + // no op return Status::OK(); } Status LegacyReplyBuilder::addOutputDoc(const BSONObj& bson) { invariant(_state == State::kOutputDocs); - invariant(_allowAddingOutputDocs); - - auto dataSize = static_cast<std::size_t>(bson.objsize()); - auto hasSpace = _hasSpaceFor(dataSize); - if (!hasSpace.isOK()) { - return hasSpace; - } - - _outputDocs.emplace_back(bson.getOwned()); - _currentLength += dataSize; - _currentLength += BSONObjBuilder::numStr(++_currentIndex).length() + 2; // storing array index - + // no op return Status::OK(); } @@ -161,87 +105,34 @@ Protocol LegacyReplyBuilder::getProtocol() const { void LegacyReplyBuilder::reset() { // If we are in State::kMetadata, we are already in the 'start' state, so by // immediately returning, we save a heap allocation. - if (_state == State::kMetadata) { + if (_state == State::kCommandReply) { return; } + _builder.reset(); + _builder.skip(sizeof(QueryResult::Value)); _message.reset(); - _currentLength = kReservedSize; - _currentIndex = 0U; - _state = State::kMetadata; + _state = State::kCommandReply; } Message LegacyReplyBuilder::done() { invariant(_state == State::kOutputDocs); - BSONObj reply = uassertStatusOK(rpc::downconvertReplyMetadata(_commandReply, _metadata)); - - BufBuilder bufBuilder; - bufBuilder.skip(sizeof(QueryResult::Value)); - - if (_allowAddingOutputDocs) { - BSONObjBuilder topBuilder(bufBuilder); - for (const auto& el : reply) { - if (kCursorTag != el.fieldNameStringData()) { - topBuilder.append(el); - continue; - } - invariant(el.isABSONObj()); - BSONObjBuilder curBuilder(topBuilder.subobjStart(kCursorTag)); - for (const auto& insideEl : el.Obj()) { - if (kFirstBatchTag != insideEl.fieldNameStringData()) { - curBuilder.append(insideEl); - continue; - } - invariant(insideEl.isABSONObj()); - BSONArrayBuilder arrBuilder(curBuilder.subarrayStart(kFirstBatchTag)); - for (const auto& doc : _outputDocs) { - arrBuilder.append(doc); - } - arrBuilder.doneFast(); - } - curBuilder.doneFast(); - } - topBuilder.doneFast(); - } else { - reply.appendSelfToBufBuilder(bufBuilder); - } - - auto msgHeaderSz = static_cast<std::size_t>(MsgData::MsgDataHeaderSize); - - invariant(static_cast<std::size_t>(bufBuilder.len()) + msgHeaderSz <= - mongo::MaxMessageSizeBytes); - - QueryResult::View qr = bufBuilder.buf(); + QueryResult::View qr = _builder.buf(); qr.setResultFlagsToOk(); - qr.msgdata().setLen(bufBuilder.len()); + qr.msgdata().setLen(_builder.len()); qr.msgdata().setOperation(opReply); qr.setCursorId(0); qr.setStartingFrom(0); qr.setNReturned(1); _message.setData(qr.view2ptr(), true); - bufBuilder.decouple(); + _builder.decouple(); _state = State::kDone; return std::move(_message); } -std::size_t LegacyReplyBuilder::availableBytes() const { - std::size_t msgHeaderSz = static_cast<std::size_t>(MsgData::MsgDataHeaderSize); - return mongo::MaxMessageSizeBytes - _currentLength - msgHeaderSz; -} - -Status LegacyReplyBuilder::_hasSpaceFor(std::size_t dataSize) const { - size_t availBytes = availableBytes(); - if (availBytes < dataSize) { - return Status(ErrorCodes::Overflow, - str::stream() << "Not enough space to store " << dataSize << " bytes. Only " - << availBytes << " bytes are available."); - } - return Status::OK(); -} - } // namespace rpc } // namespace mongo |