diff options
author | David Storch <david.storch@10gen.com> | 2015-11-13 13:28:36 -0500 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2015-11-13 16:41:37 -0500 |
commit | 7a99fd808fc4e8960d2981799415617c495a0fda (patch) | |
tree | 89fbd9ea94559b27637e9d3d1a09827981622c19 /src | |
parent | bddbae79b4733dbd392215c38beccab5daa0109c (diff) | |
download | mongo-7a99fd808fc4e8960d2981799415617c495a0fda.tar.gz |
SERVER-20853 eliminate copies in find and getMore path
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/bson/bsonmisc.cpp | 5 | ||||
-rw-r--r-- | src/mongo/bson/bsonmisc.h | 5 | ||||
-rw-r--r-- | src/mongo/bson/bsonobjbuilder.h | 17 | ||||
-rw-r--r-- | src/mongo/bson/bsonobjbuilder_test.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/dbmessage.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/dbmessage.h | 49 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.h | 57 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_cmd.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/s_only.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/strategy.cpp | 41 |
13 files changed, 247 insertions, 65 deletions
diff --git a/src/mongo/bson/bsonmisc.cpp b/src/mongo/bson/bsonmisc.cpp index b106121b590..28e88690b29 100644 --- a/src/mongo/bson/bsonmisc.cpp +++ b/src/mongo/bson/bsonmisc.cpp @@ -72,6 +72,11 @@ BSONObjBuilderValueStream::BSONObjBuilderValueStream(BSONObjBuilder* builder) { _builder = builder; } +void BSONObjBuilderValueStream::reset() { + _fieldName = StringData(); + _subobj.reset(); +} + BSONObjBuilder& BSONObjBuilderValueStream::operator<<(const BSONElement& e) { _builder->appendAs(e, _fieldName); _fieldName = StringData(); diff --git a/src/mongo/bson/bsonmisc.h b/src/mongo/bson/bsonmisc.h index d1c1894190a..f32319965f1 100644 --- a/src/mongo/bson/bsonmisc.h +++ b/src/mongo/bson/bsonmisc.h @@ -253,6 +253,11 @@ public: return *_builder; } + /** + * Restores this object to its empty state. + */ + void reset(); + private: StringData _fieldName; BSONObjBuilder* _builder; diff --git a/src/mongo/bson/bsonobjbuilder.h b/src/mongo/bson/bsonobjbuilder.h index 5b744a62120..ed21c7b63e1 100644 --- a/src/mongo/bson/bsonobjbuilder.h +++ b/src/mongo/bson/bsonobjbuilder.h @@ -601,6 +601,20 @@ public: BSONObjBuilder& append(StringData fieldName, const std::map<K, T>& vals); /** + * Resets this BSONObjBulder to an empty state. All previously added fields are lost. If this + * BSONObjBuilder is using an externally provided BufBuilder, this method does not affect the + * bytes before the start of this object. + * + * Invalid to call if done() has already been called in order to finalize the BSONObj. + */ + void resetToEmpty() { + invariant(!_doneCalled); + _s.reset(); + // Reset the position the next write will go to right after our size reservation. + _b.setlen(_offset + sizeof(int)); + } + + /** * destructive * The returned BSONObj will free the buffer when it is finished. * @return owned BSONObj @@ -887,10 +901,9 @@ private: template <class T> inline BSONObjBuilder& BSONObjBuilder::append(StringData fieldName, const std::vector<T>& vals) { - BSONObjBuilder arrBuilder; + BSONObjBuilder arrBuilder(subarrayStart(fieldName)); for (unsigned int i = 0; i < vals.size(); ++i) arrBuilder.append(numStr(i), vals[i]); - appendArray(fieldName, arrBuilder.done()); return *this; } diff --git a/src/mongo/bson/bsonobjbuilder_test.cpp b/src/mongo/bson/bsonobjbuilder_test.cpp index d92d48d6098..fdee41b8ba1 100644 --- a/src/mongo/bson/bsonobjbuilder_test.cpp +++ b/src/mongo/bson/bsonobjbuilder_test.cpp @@ -303,4 +303,21 @@ TEST(BSONObjBuilderTest, ResumeBuildingWithNesting) { << "dd")) << "a" << BSON("c" << 3))); } +TEST(BSONObjBuilderTest, ResetToEmptyResultsInEmptyObj) { + BSONObjBuilder bob; + bob.append("a", 3); + bob.resetToEmpty(); + ASSERT_EQ(BSONObj(), bob.obj()); +} + +TEST(BSONObjBuilderTest, ResetToEmptyForNestedBuilderOnlyResetsInnerObj) { + BSONObjBuilder bob; + bob.append("a", 3); + BSONObjBuilder innerObj(bob.subobjStart("nestedObj")); + innerObj.append("b", 4); + innerObj.resetToEmpty(); + innerObj.done(); + ASSERT_EQ(BSON("a" << 3 << "nestedObj" << BSONObj()), bob.obj()); +} + } // unnamed namespace diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 6651baef97d..633bd24ef8b 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -294,15 +294,15 @@ public: const LiteParsedQuery& pq = exec->getCanonicalQuery()->getParsed(); // Stream query results, adding them to a BSONArray as we go. - BSONArrayBuilder firstBatch; + CursorResponseBuilder firstBatch(/*isInitialResponse*/ true, &result); BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; long long numResults = 0; - while (!FindCommon::enoughForFirstBatch(pq, numResults, firstBatch.len()) && + while (!FindCommon::enoughForFirstBatch(pq, numResults, firstBatch.bytesUsed()) && PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { // If adding this object will cause us to exceed the BSON size limit, then we stash // it for later. - if (firstBatch.len() + obj.objsize() > BSONObjMaxUserSize && numResults > 0) { + if (firstBatch.bytesUsed() + obj.objsize() > BSONObjMaxUserSize && numResults > 0) { exec->enqueue(obj); break; } @@ -314,6 +314,7 @@ public: // Throw an assertion if query execution fails for any reason. if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { + firstBatch.abandon(); const std::unique_ptr<PlanStageStats> stats(exec->getStats()); error() << "Plan executor error during find command: " << PlanExecutor::statestr(state) << ", stats: " << Explain::statsToBSON(*stats); @@ -374,7 +375,7 @@ public: } // Generate the response object to send to the client. - appendCursorResponseObject(cursorId, nss.ns(), firstBatch.arr(), &result); + firstBatch.done(cursorId, nss.ns()); return true; } diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index deebe481f1b..4ba445d1825 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -294,7 +294,7 @@ public: } CursorId respondWithId = 0; - BSONArrayBuilder nextBatch; + CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result); BSONObj obj; PlanExecutor::ExecState state; long long numResults = 0; @@ -360,7 +360,7 @@ public: CurOp::get(txn)->debug().cursorExhausted = true; } - appendGetMoreResponseObject(respondWithId, request.nss.ns(), nextBatch.arr(), &result); + nextBatch.done(respondWithId, request.nss.ns()); if (respondWithId) { cursorFreer.Dismiss(); @@ -390,7 +390,7 @@ public: */ Status generateBatch(ClientCursor* cursor, const GetMoreRequest& request, - BSONArrayBuilder* nextBatch, + CursorResponseBuilder* nextBatch, PlanExecutor::ExecState* state, long long* numResults) { PlanExecutor* exec = cursor->getExecutor(); @@ -404,7 +404,8 @@ public: while (PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { // If adding this object will cause us to exceed the BSON size limit, then we // stash it for later. - if (nextBatch->len() + obj.objsize() > BSONObjMaxUserSize && *numResults > 0) { + if (nextBatch->bytesUsed() + obj.objsize() > BSONObjMaxUserSize && + *numResults > 0) { exec->enqueue(obj); break; } @@ -414,7 +415,7 @@ public: (*numResults)++; if (FindCommon::enoughForGetMore( - request.batchSize.value_or(0), *numResults, nextBatch->len())) { + request.batchSize.value_or(0), *numResults, nextBatch->bytesUsed())) { break; } } @@ -428,6 +429,8 @@ public: } if (PlanExecutor::FAILURE == *state || PlanExecutor::DEAD == *state) { + nextBatch->abandon(); + const std::unique_ptr<PlanStageStats> stats(exec->getStats()); error() << "GetMore command executor error: " << PlanExecutor::statestr(*state) << ", stats: " << Explain::statsToBSON(*stats); diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp index 0be54d0f818..e8529486f8a 100644 --- a/src/mongo/db/dbmessage.cpp +++ b/src/mongo/db/dbmessage.cpp @@ -169,6 +169,39 @@ T DbMessage::readAndAdvance() { return t; } +OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) { + _buffer.skip(sizeof(QueryResult::Value)); +} + +void OpQueryReplyBuilder::send(AbstractMessagingPort* destination, + int queryResultFlags, + Message& requestMsg, + int nReturned, + int startingFrom, + long long cursorId) { + Message response; + putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId); + destination->reply(requestMsg, response, requestMsg.header().getId()); +} + +void OpQueryReplyBuilder::sendCommandReply(AbstractMessagingPort* destination, + Message& requestMsg) { + send(destination, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); +} + +void OpQueryReplyBuilder::putInMessage( + Message* out, int queryResultFlags, int nReturned, int startingFrom, long long cursorId) { + QueryResult::View qr = _buffer.buf(); + qr.setResultFlags(queryResultFlags); + qr.msgdata().setLen(_buffer.len()); + qr.msgdata().setOperation(opReply); + qr.setCursorId(cursorId); + qr.setStartingFrom(startingFrom); + qr.setNReturned(nReturned); + _buffer.decouple(); + out->setData(qr.view2ptr(), true); // transport will free +} + void replyToQuery(int queryResultFlags, AbstractMessagingPort* p, Message& requestMsg, @@ -177,19 +210,9 @@ void replyToQuery(int queryResultFlags, int nReturned, int startingFrom, long long cursorId) { - BufBuilder b(32768); - b.skip(sizeof(QueryResult::Value)); - b.appendBuf(data, size); - QueryResult::View qr = b.buf(); - qr.setResultFlags(queryResultFlags); - qr.msgdata().setLen(b.len()); - qr.msgdata().setOperation(opReply); - qr.setCursorId(cursorId); - qr.setStartingFrom(startingFrom); - qr.setNReturned(nReturned); - b.decouple(); - Message resp(qr.view2ptr(), true); - p->reply(requestMsg, resp, requestMsg.header().getId()); + OpQueryReplyBuilder reply; + reply.bufBuilderForResults().appendBuf(data, size); + reply.send(p, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId); } void replyToQuery(int queryResultFlags, @@ -208,21 +231,8 @@ void replyToQuery(int queryResultFlags, Message& m, DbResponse& dbresponse, BSON } void replyToQuery(int queryResultFlags, Message& response, const BSONObj& resultObj) { - BufBuilder bufBuilder; - bufBuilder.skip(sizeof(QueryResult::Value)); - bufBuilder.appendBuf(reinterpret_cast<void*>(const_cast<char*>(resultObj.objdata())), - resultObj.objsize()); - - QueryResult::View queryResult = bufBuilder.buf(); - bufBuilder.decouple(); - - queryResult.setResultFlags(queryResultFlags); - queryResult.msgdata().setLen(bufBuilder.len()); - queryResult.msgdata().setOperation(opReply); - queryResult.setCursorId(0); - queryResult.setStartingFrom(0); - queryResult.setNReturned(1); - - response.setData(queryResult.view2ptr(), true); // transport will free + OpQueryReplyBuilder reply; + resultObj.appendSelfToBufBuilder(reply.bufBuilderForResults()); + reply.putInMessage(&response, queryResultFlags, /*nReturned*/ 1); } } diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 2f0c6b6645b..7cc1d62268c 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -319,6 +319,55 @@ struct DbResponse { DbResponse() = default; }; +/** + * Prepares query replies to legacy finds (opReply to dbQuery) in place. This is also used for + * command responses that don't use the new dbCommand protocol. + */ +class OpQueryReplyBuilder { + MONGO_DISALLOW_COPYING(OpQueryReplyBuilder); + +public: + OpQueryReplyBuilder(); + + /** + * Returns the BufBuilder that should be used for placing result objects. It will be positioned + * where the first (or next) object should go. + * + * You must finish the BSONObjBuilder that uses this (by destruction or calling doneFast()) + * before calling any more methods on this object. + */ + BufBuilder& bufBuilderForResults() { + return _buffer; + } + + /** + * Finishes the reply and transfers the message buffer into 'out'. + */ + void putInMessage(Message* out, + int queryResultFlags, + int nReturned, + int startingFrom = 0, + long long cursorId = 0); + + /** + * Finishes the reply and sends the message out to 'destination'. + */ + void send(AbstractMessagingPort* destination, + int queryResultFlags, + Message& requestMsg, // should be const but MessagePort::reply takes non-const. + int nReturned, + int startingFrom = 0, + long long cursorId = 0); + + /** + * Similar to send() but used for replying to a command. + */ + void sendCommandReply(AbstractMessagingPort* destination, Message& requestMsg); + +private: + BufBuilder _buffer; +}; + void replyToQuery(int queryResultFlags, AbstractMessagingPort* p, Message& requestMsg, diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 2954da683c9..405f7476d4b 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -48,6 +48,30 @@ const char kBatchFieldInitial[] = "firstBatch"; } // namespace +CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse, + BSONObjBuilder* commandResponse) + : _responseInitialLen(commandResponse->bb().len()), + _commandResponse(commandResponse), + _cursorObject(commandResponse->subobjStart(kCursorField)), + _batch(_cursorObject.subarrayStart(isInitialResponse ? kBatchFieldInitial : kBatchField)) {} + +void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) { + invariant(_active); + _batch.doneFast(); + _cursorObject.append(kIdField, cursorId); + _cursorObject.append(kNsField, cursorNamespace); + _cursorObject.doneFast(); + _active = false; +} + +void CursorResponseBuilder::abandon() { + invariant(_active); + _batch.doneFast(); + _cursorObject.doneFast(); + _commandResponse->bb().setlen(_responseInitialLen); // Removes everything we've added. + _active = false; +} + void appendCursorResponseObject(long long cursorId, StringData cursorNamespace, BSONArray firstBatch, diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index 8e28ed56711..3fe1e56d15f 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -39,13 +39,66 @@ namespace mongo { /** + * Builds the cursor field for a reply to a cursor-generating command in place. + */ +class CursorResponseBuilder { + MONGO_DISALLOW_COPYING(CursorResponseBuilder); + +public: + /** + * Once constructed, you may not use the passed-in BSONObjBuilder until you call either done() + * or abandon(), or this object goes out of scope. This is the same as the rule when using a + * BSONObjBuilder to build a sub-object with subobjStart(). + * + * If the builder goes out of scope without a call to done(), any data appended to the + * builder will be removed. + */ + CursorResponseBuilder(bool isInitialResponse, BSONObjBuilder* commandResponse); + + ~CursorResponseBuilder() { + if (_active) + abandon(); + } + + size_t bytesUsed() const { + invariant(_active); + return _batch.len(); + } + + void append(const BSONObj& obj) { + invariant(_active); + _batch.append(obj); + } + + /** + * Call this after successfully appending all fields that will be part of this response. + * After calling, you may not call any more methods on this object. + */ + void done(CursorId cursorId, StringData cursorNamespace); + + /** + * Call this if the response should not contain cursor information. It will completely remove + * the cursor field from the commandResponse, as if the CursorResponseBuilder was never used. + * After calling, you may not call any more methods on this object. + */ + void abandon(); + +private: + const int _responseInitialLen; // Must be the first member so its initializer runs first. + bool _active = true; + BSONObjBuilder* const _commandResponse; + BSONObjBuilder _cursorObject; + BSONArrayBuilder _batch; +}; + +/** * Builds a cursor response object from the provided cursor identifiers and "firstBatch", * and appends the response object to the provided builder under the field name "cursor". * * The response object has the following format: * { id: <NumberLong>, ns: <String>, firstBatch: <Array> }. * - * This function is deprecated. Prefer CursorResponse::toBSON() instead. + * This function is deprecated. Prefer CursorResponseBuilder or CursorResponse::toBSON() instead. */ void appendCursorResponseObject(long long cursorId, StringData cursorNamespace, @@ -59,7 +112,7 @@ void appendCursorResponseObject(long long cursorId, * The response object has the following format: * { id: <NumberLong>, ns: <String>, nextBatch: <Array> }. * - * This function is deprecated. Prefer CursorResponse::toBSON() instead. + * This function is deprecated. Prefer CursorResponseBuilder or CursorResponse::toBSON() instead. */ void appendGetMoreResponseObject(long long cursorId, StringData cursorNamespace, diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index f49c4ebd3c6..44ec66acb12 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -165,11 +165,11 @@ public: } // Build the response document. - BSONArrayBuilder arr; + CursorResponseBuilder firstBatch(/*firstBatch*/ true, &result); for (const auto& obj : batch) { - arr.append(obj); + firstBatch.append(obj); } - appendCursorResponseObject(cursorId.getValue(), nss.ns(), arr.arr(), &result); + firstBatch.done(cursorId.getValue(), nss.ns()); return true; } diff --git a/src/mongo/s/s_only.cpp b/src/mongo/s/s_only.cpp index a718c31c22d..2322aa67d3d 100644 --- a/src/mongo/s/s_only.cpp +++ b/src/mongo/s/s_only.cpp @@ -127,6 +127,7 @@ void Command::execCommandClientBasic(OperationContext* txn, try { ok = c->run(txn, dbname, cmdObj, queryOptions, errmsg, result); } catch (const DBException& e) { + result.resetToEmpty(); const int code = e.getCode(); // Codes for StaleConfigException diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 04b71efca20..3cd57de4fa2 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -165,23 +165,19 @@ void Strategy::queryOp(OperationContext* txn, Request& request) { auto cursorId = ClusterFind::runQuery(txn, *canonicalQuery.getValue(), readPreference, &batch); uassertStatusOK(cursorId.getStatus()); - BufBuilder buffer(FindCommon::kInitReplyBufferSize); - // Fill out the response buffer. int numResults = 0; - for (const auto& obj : batch) { - buffer.appendBuf((void*)obj.objdata(), obj.objsize()); + OpQueryReplyBuilder reply; + for (auto&& obj : batch) { + obj.appendSelfToBufBuilder(reply.bufBuilderForResults()); numResults++; } - - replyToQuery(0, // query result flags - request.p(), - request.m(), - buffer.buf(), - buffer.len(), - numResults, - 0, // startingFrom - cursorId.getValue()); + reply.send(request.p(), + 0, // query result flags + request.m(), + numResults, + 0, // startingFrom + cursorId.getValue()); } void Strategy::clientCommandOp(OperationContext* txn, Request& request) { @@ -207,7 +203,6 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) { bool cmChangeAttempted = false; while (true) { - BSONObjBuilder builder; try { BSONObj cmdObj = q.query; { @@ -236,9 +231,12 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) { } } - Command::runAgainstRegistered(txn, q.ns, cmdObj, builder, q.queryOptions); - BSONObj x = builder.done(); - replyToQuery(0, request.p(), request.m(), x); + OpQueryReplyBuilder reply; + { + BSONObjBuilder builder(reply.bufBuilderForResults()); + Command::runAgainstRegistered(txn, q.ns, cmdObj, builder, q.queryOptions); + } + reply.sendCommandReply(request.p(), request.m()); return; } catch (const StaleConfigException& e) { if (loops <= 0) @@ -262,9 +260,12 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) { grid.forwardingCatalogManager()->waitForCatalogManagerChange(txn); } else { - Command::appendCommandStatus(builder, e.toStatus()); - BSONObj x = builder.done(); - replyToQuery(0, request.p(), request.m(), x); + OpQueryReplyBuilder reply; + { + BSONObjBuilder builder(reply.bufBuilderForResults()); + Command::appendCommandStatus(builder, e.toStatus()); + } + reply.sendCommandReply(request.p(), request.m()); return; } } |