summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2015-11-13 13:28:36 -0500
committerDavid Storch <david.storch@10gen.com>2015-11-13 16:41:37 -0500
commit7a99fd808fc4e8960d2981799415617c495a0fda (patch)
tree89fbd9ea94559b27637e9d3d1a09827981622c19 /src/mongo
parentbddbae79b4733dbd392215c38beccab5daa0109c (diff)
downloadmongo-7a99fd808fc4e8960d2981799415617c495a0fda.tar.gz
SERVER-20853 eliminate copies in find and getMore path
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/bson/bsonmisc.cpp5
-rw-r--r--src/mongo/bson/bsonmisc.h5
-rw-r--r--src/mongo/bson/bsonobjbuilder.h17
-rw-r--r--src/mongo/bson/bsonobjbuilder_test.cpp17
-rw-r--r--src/mongo/db/commands/find_cmd.cpp9
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp13
-rw-r--r--src/mongo/db/dbmessage.cpp68
-rw-r--r--src/mongo/db/dbmessage.h49
-rw-r--r--src/mongo/db/query/cursor_response.cpp24
-rw-r--r--src/mongo/db/query/cursor_response.h57
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.cpp6
-rw-r--r--src/mongo/s/s_only.cpp1
-rw-r--r--src/mongo/s/strategy.cpp41
13 files changed, 247 insertions, 65 deletions
diff --git a/src/mongo/bson/bsonmisc.cpp b/src/mongo/bson/bsonmisc.cpp
index b106121b590..28e88690b29 100644
--- a/src/mongo/bson/bsonmisc.cpp
+++ b/src/mongo/bson/bsonmisc.cpp
@@ -72,6 +72,11 @@ BSONObjBuilderValueStream::BSONObjBuilderValueStream(BSONObjBuilder* builder) {
_builder = builder;
}
+void BSONObjBuilderValueStream::reset() {
+ _fieldName = StringData();
+ _subobj.reset();
+}
+
BSONObjBuilder& BSONObjBuilderValueStream::operator<<(const BSONElement& e) {
_builder->appendAs(e, _fieldName);
_fieldName = StringData();
diff --git a/src/mongo/bson/bsonmisc.h b/src/mongo/bson/bsonmisc.h
index d1c1894190a..f32319965f1 100644
--- a/src/mongo/bson/bsonmisc.h
+++ b/src/mongo/bson/bsonmisc.h
@@ -253,6 +253,11 @@ public:
return *_builder;
}
+ /**
+ * Restores this object to its empty state.
+ */
+ void reset();
+
private:
StringData _fieldName;
BSONObjBuilder* _builder;
diff --git a/src/mongo/bson/bsonobjbuilder.h b/src/mongo/bson/bsonobjbuilder.h
index 5b744a62120..ed21c7b63e1 100644
--- a/src/mongo/bson/bsonobjbuilder.h
+++ b/src/mongo/bson/bsonobjbuilder.h
@@ -601,6 +601,20 @@ public:
BSONObjBuilder& append(StringData fieldName, const std::map<K, T>& vals);
/**
+ * Resets this BSONObjBulder to an empty state. All previously added fields are lost. If this
+ * BSONObjBuilder is using an externally provided BufBuilder, this method does not affect the
+ * bytes before the start of this object.
+ *
+ * Invalid to call if done() has already been called in order to finalize the BSONObj.
+ */
+ void resetToEmpty() {
+ invariant(!_doneCalled);
+ _s.reset();
+ // Reset the position the next write will go to right after our size reservation.
+ _b.setlen(_offset + sizeof(int));
+ }
+
+ /**
* destructive
* The returned BSONObj will free the buffer when it is finished.
* @return owned BSONObj
@@ -887,10 +901,9 @@ private:
template <class T>
inline BSONObjBuilder& BSONObjBuilder::append(StringData fieldName, const std::vector<T>& vals) {
- BSONObjBuilder arrBuilder;
+ BSONObjBuilder arrBuilder(subarrayStart(fieldName));
for (unsigned int i = 0; i < vals.size(); ++i)
arrBuilder.append(numStr(i), vals[i]);
- appendArray(fieldName, arrBuilder.done());
return *this;
}
diff --git a/src/mongo/bson/bsonobjbuilder_test.cpp b/src/mongo/bson/bsonobjbuilder_test.cpp
index d92d48d6098..fdee41b8ba1 100644
--- a/src/mongo/bson/bsonobjbuilder_test.cpp
+++ b/src/mongo/bson/bsonobjbuilder_test.cpp
@@ -303,4 +303,21 @@ TEST(BSONObjBuilderTest, ResumeBuildingWithNesting) {
<< "dd")) << "a" << BSON("c" << 3)));
}
+TEST(BSONObjBuilderTest, ResetToEmptyResultsInEmptyObj) {
+ BSONObjBuilder bob;
+ bob.append("a", 3);
+ bob.resetToEmpty();
+ ASSERT_EQ(BSONObj(), bob.obj());
+}
+
+TEST(BSONObjBuilderTest, ResetToEmptyForNestedBuilderOnlyResetsInnerObj) {
+ BSONObjBuilder bob;
+ bob.append("a", 3);
+ BSONObjBuilder innerObj(bob.subobjStart("nestedObj"));
+ innerObj.append("b", 4);
+ innerObj.resetToEmpty();
+ innerObj.done();
+ ASSERT_EQ(BSON("a" << 3 << "nestedObj" << BSONObj()), bob.obj());
+}
+
} // unnamed namespace
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 6651baef97d..633bd24ef8b 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -294,15 +294,15 @@ public:
const LiteParsedQuery& pq = exec->getCanonicalQuery()->getParsed();
// Stream query results, adding them to a BSONArray as we go.
- BSONArrayBuilder firstBatch;
+ CursorResponseBuilder firstBatch(/*isInitialResponse*/ true, &result);
BSONObj obj;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
long long numResults = 0;
- while (!FindCommon::enoughForFirstBatch(pq, numResults, firstBatch.len()) &&
+ while (!FindCommon::enoughForFirstBatch(pq, numResults, firstBatch.bytesUsed()) &&
PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
// If adding this object will cause us to exceed the BSON size limit, then we stash
// it for later.
- if (firstBatch.len() + obj.objsize() > BSONObjMaxUserSize && numResults > 0) {
+ if (firstBatch.bytesUsed() + obj.objsize() > BSONObjMaxUserSize && numResults > 0) {
exec->enqueue(obj);
break;
}
@@ -314,6 +314,7 @@ public:
// Throw an assertion if query execution fails for any reason.
if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
+ firstBatch.abandon();
const std::unique_ptr<PlanStageStats> stats(exec->getStats());
error() << "Plan executor error during find command: " << PlanExecutor::statestr(state)
<< ", stats: " << Explain::statsToBSON(*stats);
@@ -374,7 +375,7 @@ public:
}
// Generate the response object to send to the client.
- appendCursorResponseObject(cursorId, nss.ns(), firstBatch.arr(), &result);
+ firstBatch.done(cursorId, nss.ns());
return true;
}
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index deebe481f1b..4ba445d1825 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -294,7 +294,7 @@ public:
}
CursorId respondWithId = 0;
- BSONArrayBuilder nextBatch;
+ CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result);
BSONObj obj;
PlanExecutor::ExecState state;
long long numResults = 0;
@@ -360,7 +360,7 @@ public:
CurOp::get(txn)->debug().cursorExhausted = true;
}
- appendGetMoreResponseObject(respondWithId, request.nss.ns(), nextBatch.arr(), &result);
+ nextBatch.done(respondWithId, request.nss.ns());
if (respondWithId) {
cursorFreer.Dismiss();
@@ -390,7 +390,7 @@ public:
*/
Status generateBatch(ClientCursor* cursor,
const GetMoreRequest& request,
- BSONArrayBuilder* nextBatch,
+ CursorResponseBuilder* nextBatch,
PlanExecutor::ExecState* state,
long long* numResults) {
PlanExecutor* exec = cursor->getExecutor();
@@ -404,7 +404,8 @@ public:
while (PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) {
// If adding this object will cause us to exceed the BSON size limit, then we
// stash it for later.
- if (nextBatch->len() + obj.objsize() > BSONObjMaxUserSize && *numResults > 0) {
+ if (nextBatch->bytesUsed() + obj.objsize() > BSONObjMaxUserSize &&
+ *numResults > 0) {
exec->enqueue(obj);
break;
}
@@ -414,7 +415,7 @@ public:
(*numResults)++;
if (FindCommon::enoughForGetMore(
- request.batchSize.value_or(0), *numResults, nextBatch->len())) {
+ request.batchSize.value_or(0), *numResults, nextBatch->bytesUsed())) {
break;
}
}
@@ -428,6 +429,8 @@ public:
}
if (PlanExecutor::FAILURE == *state || PlanExecutor::DEAD == *state) {
+ nextBatch->abandon();
+
const std::unique_ptr<PlanStageStats> stats(exec->getStats());
error() << "GetMore command executor error: " << PlanExecutor::statestr(*state)
<< ", stats: " << Explain::statsToBSON(*stats);
diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp
index 0be54d0f818..e8529486f8a 100644
--- a/src/mongo/db/dbmessage.cpp
+++ b/src/mongo/db/dbmessage.cpp
@@ -169,6 +169,39 @@ T DbMessage::readAndAdvance() {
return t;
}
+OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) {
+ _buffer.skip(sizeof(QueryResult::Value));
+}
+
+void OpQueryReplyBuilder::send(AbstractMessagingPort* destination,
+ int queryResultFlags,
+ Message& requestMsg,
+ int nReturned,
+ int startingFrom,
+ long long cursorId) {
+ Message response;
+ putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId);
+ destination->reply(requestMsg, response, requestMsg.header().getId());
+}
+
+void OpQueryReplyBuilder::sendCommandReply(AbstractMessagingPort* destination,
+ Message& requestMsg) {
+ send(destination, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1);
+}
+
+void OpQueryReplyBuilder::putInMessage(
+ Message* out, int queryResultFlags, int nReturned, int startingFrom, long long cursorId) {
+ QueryResult::View qr = _buffer.buf();
+ qr.setResultFlags(queryResultFlags);
+ qr.msgdata().setLen(_buffer.len());
+ qr.msgdata().setOperation(opReply);
+ qr.setCursorId(cursorId);
+ qr.setStartingFrom(startingFrom);
+ qr.setNReturned(nReturned);
+ _buffer.decouple();
+ out->setData(qr.view2ptr(), true); // transport will free
+}
+
void replyToQuery(int queryResultFlags,
AbstractMessagingPort* p,
Message& requestMsg,
@@ -177,19 +210,9 @@ void replyToQuery(int queryResultFlags,
int nReturned,
int startingFrom,
long long cursorId) {
- BufBuilder b(32768);
- b.skip(sizeof(QueryResult::Value));
- b.appendBuf(data, size);
- QueryResult::View qr = b.buf();
- qr.setResultFlags(queryResultFlags);
- qr.msgdata().setLen(b.len());
- qr.msgdata().setOperation(opReply);
- qr.setCursorId(cursorId);
- qr.setStartingFrom(startingFrom);
- qr.setNReturned(nReturned);
- b.decouple();
- Message resp(qr.view2ptr(), true);
- p->reply(requestMsg, resp, requestMsg.header().getId());
+ OpQueryReplyBuilder reply;
+ reply.bufBuilderForResults().appendBuf(data, size);
+ reply.send(p, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId);
}
void replyToQuery(int queryResultFlags,
@@ -208,21 +231,8 @@ void replyToQuery(int queryResultFlags, Message& m, DbResponse& dbresponse, BSON
}
void replyToQuery(int queryResultFlags, Message& response, const BSONObj& resultObj) {
- BufBuilder bufBuilder;
- bufBuilder.skip(sizeof(QueryResult::Value));
- bufBuilder.appendBuf(reinterpret_cast<void*>(const_cast<char*>(resultObj.objdata())),
- resultObj.objsize());
-
- QueryResult::View queryResult = bufBuilder.buf();
- bufBuilder.decouple();
-
- queryResult.setResultFlags(queryResultFlags);
- queryResult.msgdata().setLen(bufBuilder.len());
- queryResult.msgdata().setOperation(opReply);
- queryResult.setCursorId(0);
- queryResult.setStartingFrom(0);
- queryResult.setNReturned(1);
-
- response.setData(queryResult.view2ptr(), true); // transport will free
+ OpQueryReplyBuilder reply;
+ resultObj.appendSelfToBufBuilder(reply.bufBuilderForResults());
+ reply.putInMessage(&response, queryResultFlags, /*nReturned*/ 1);
}
}
diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h
index 2f0c6b6645b..7cc1d62268c 100644
--- a/src/mongo/db/dbmessage.h
+++ b/src/mongo/db/dbmessage.h
@@ -319,6 +319,55 @@ struct DbResponse {
DbResponse() = default;
};
+/**
+ * Prepares query replies to legacy finds (opReply to dbQuery) in place. This is also used for
+ * command responses that don't use the new dbCommand protocol.
+ */
+class OpQueryReplyBuilder {
+ MONGO_DISALLOW_COPYING(OpQueryReplyBuilder);
+
+public:
+ OpQueryReplyBuilder();
+
+ /**
+ * Returns the BufBuilder that should be used for placing result objects. It will be positioned
+ * where the first (or next) object should go.
+ *
+ * You must finish the BSONObjBuilder that uses this (by destruction or calling doneFast())
+ * before calling any more methods on this object.
+ */
+ BufBuilder& bufBuilderForResults() {
+ return _buffer;
+ }
+
+ /**
+ * Finishes the reply and transfers the message buffer into 'out'.
+ */
+ void putInMessage(Message* out,
+ int queryResultFlags,
+ int nReturned,
+ int startingFrom = 0,
+ long long cursorId = 0);
+
+ /**
+ * Finishes the reply and sends the message out to 'destination'.
+ */
+ void send(AbstractMessagingPort* destination,
+ int queryResultFlags,
+ Message& requestMsg, // should be const but MessagePort::reply takes non-const.
+ int nReturned,
+ int startingFrom = 0,
+ long long cursorId = 0);
+
+ /**
+ * Similar to send() but used for replying to a command.
+ */
+ void sendCommandReply(AbstractMessagingPort* destination, Message& requestMsg);
+
+private:
+ BufBuilder _buffer;
+};
+
void replyToQuery(int queryResultFlags,
AbstractMessagingPort* p,
Message& requestMsg,
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 2954da683c9..405f7476d4b 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -48,6 +48,30 @@ const char kBatchFieldInitial[] = "firstBatch";
} // namespace
+CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse,
+ BSONObjBuilder* commandResponse)
+ : _responseInitialLen(commandResponse->bb().len()),
+ _commandResponse(commandResponse),
+ _cursorObject(commandResponse->subobjStart(kCursorField)),
+ _batch(_cursorObject.subarrayStart(isInitialResponse ? kBatchFieldInitial : kBatchField)) {}
+
+void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) {
+ invariant(_active);
+ _batch.doneFast();
+ _cursorObject.append(kIdField, cursorId);
+ _cursorObject.append(kNsField, cursorNamespace);
+ _cursorObject.doneFast();
+ _active = false;
+}
+
+void CursorResponseBuilder::abandon() {
+ invariant(_active);
+ _batch.doneFast();
+ _cursorObject.doneFast();
+ _commandResponse->bb().setlen(_responseInitialLen); // Removes everything we've added.
+ _active = false;
+}
+
void appendCursorResponseObject(long long cursorId,
StringData cursorNamespace,
BSONArray firstBatch,
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index 8e28ed56711..3fe1e56d15f 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -39,13 +39,66 @@
namespace mongo {
/**
+ * Builds the cursor field for a reply to a cursor-generating command in place.
+ */
+class CursorResponseBuilder {
+ MONGO_DISALLOW_COPYING(CursorResponseBuilder);
+
+public:
+ /**
+ * Once constructed, you may not use the passed-in BSONObjBuilder until you call either done()
+ * or abandon(), or this object goes out of scope. This is the same as the rule when using a
+ * BSONObjBuilder to build a sub-object with subobjStart().
+ *
+ * If the builder goes out of scope without a call to done(), any data appended to the
+ * builder will be removed.
+ */
+ CursorResponseBuilder(bool isInitialResponse, BSONObjBuilder* commandResponse);
+
+ ~CursorResponseBuilder() {
+ if (_active)
+ abandon();
+ }
+
+ size_t bytesUsed() const {
+ invariant(_active);
+ return _batch.len();
+ }
+
+ void append(const BSONObj& obj) {
+ invariant(_active);
+ _batch.append(obj);
+ }
+
+ /**
+ * Call this after successfully appending all fields that will be part of this response.
+ * After calling, you may not call any more methods on this object.
+ */
+ void done(CursorId cursorId, StringData cursorNamespace);
+
+ /**
+ * Call this if the response should not contain cursor information. It will completely remove
+ * the cursor field from the commandResponse, as if the CursorResponseBuilder was never used.
+ * After calling, you may not call any more methods on this object.
+ */
+ void abandon();
+
+private:
+ const int _responseInitialLen; // Must be the first member so its initializer runs first.
+ bool _active = true;
+ BSONObjBuilder* const _commandResponse;
+ BSONObjBuilder _cursorObject;
+ BSONArrayBuilder _batch;
+};
+
+/**
* Builds a cursor response object from the provided cursor identifiers and "firstBatch",
* and appends the response object to the provided builder under the field name "cursor".
*
* The response object has the following format:
* { id: <NumberLong>, ns: <String>, firstBatch: <Array> }.
*
- * This function is deprecated. Prefer CursorResponse::toBSON() instead.
+ * This function is deprecated. Prefer CursorResponseBuilder or CursorResponse::toBSON() instead.
*/
void appendCursorResponseObject(long long cursorId,
StringData cursorNamespace,
@@ -59,7 +112,7 @@ void appendCursorResponseObject(long long cursorId,
* The response object has the following format:
* { id: <NumberLong>, ns: <String>, nextBatch: <Array> }.
*
- * This function is deprecated. Prefer CursorResponse::toBSON() instead.
+ * This function is deprecated. Prefer CursorResponseBuilder or CursorResponse::toBSON() instead.
*/
void appendGetMoreResponseObject(long long cursorId,
StringData cursorNamespace,
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp
index f49c4ebd3c6..44ec66acb12 100644
--- a/src/mongo/s/commands/cluster_find_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_cmd.cpp
@@ -165,11 +165,11 @@ public:
}
// Build the response document.
- BSONArrayBuilder arr;
+ CursorResponseBuilder firstBatch(/*firstBatch*/ true, &result);
for (const auto& obj : batch) {
- arr.append(obj);
+ firstBatch.append(obj);
}
- appendCursorResponseObject(cursorId.getValue(), nss.ns(), arr.arr(), &result);
+ firstBatch.done(cursorId.getValue(), nss.ns());
return true;
}
diff --git a/src/mongo/s/s_only.cpp b/src/mongo/s/s_only.cpp
index a718c31c22d..2322aa67d3d 100644
--- a/src/mongo/s/s_only.cpp
+++ b/src/mongo/s/s_only.cpp
@@ -127,6 +127,7 @@ void Command::execCommandClientBasic(OperationContext* txn,
try {
ok = c->run(txn, dbname, cmdObj, queryOptions, errmsg, result);
} catch (const DBException& e) {
+ result.resetToEmpty();
const int code = e.getCode();
// Codes for StaleConfigException
diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp
index 04b71efca20..3cd57de4fa2 100644
--- a/src/mongo/s/strategy.cpp
+++ b/src/mongo/s/strategy.cpp
@@ -165,23 +165,19 @@ void Strategy::queryOp(OperationContext* txn, Request& request) {
auto cursorId = ClusterFind::runQuery(txn, *canonicalQuery.getValue(), readPreference, &batch);
uassertStatusOK(cursorId.getStatus());
- BufBuilder buffer(FindCommon::kInitReplyBufferSize);
-
// Fill out the response buffer.
int numResults = 0;
- for (const auto& obj : batch) {
- buffer.appendBuf((void*)obj.objdata(), obj.objsize());
+ OpQueryReplyBuilder reply;
+ for (auto&& obj : batch) {
+ obj.appendSelfToBufBuilder(reply.bufBuilderForResults());
numResults++;
}
-
- replyToQuery(0, // query result flags
- request.p(),
- request.m(),
- buffer.buf(),
- buffer.len(),
- numResults,
- 0, // startingFrom
- cursorId.getValue());
+ reply.send(request.p(),
+ 0, // query result flags
+ request.m(),
+ numResults,
+ 0, // startingFrom
+ cursorId.getValue());
}
void Strategy::clientCommandOp(OperationContext* txn, Request& request) {
@@ -207,7 +203,6 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) {
bool cmChangeAttempted = false;
while (true) {
- BSONObjBuilder builder;
try {
BSONObj cmdObj = q.query;
{
@@ -236,9 +231,12 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) {
}
}
- Command::runAgainstRegistered(txn, q.ns, cmdObj, builder, q.queryOptions);
- BSONObj x = builder.done();
- replyToQuery(0, request.p(), request.m(), x);
+ OpQueryReplyBuilder reply;
+ {
+ BSONObjBuilder builder(reply.bufBuilderForResults());
+ Command::runAgainstRegistered(txn, q.ns, cmdObj, builder, q.queryOptions);
+ }
+ reply.sendCommandReply(request.p(), request.m());
return;
} catch (const StaleConfigException& e) {
if (loops <= 0)
@@ -262,9 +260,12 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) {
grid.forwardingCatalogManager()->waitForCatalogManagerChange(txn);
} else {
- Command::appendCommandStatus(builder, e.toStatus());
- BSONObj x = builder.done();
- replyToQuery(0, request.p(), request.m(), x);
+ OpQueryReplyBuilder reply;
+ {
+ BSONObjBuilder builder(reply.bufBuilderForResults());
+ Command::appendCommandStatus(builder, e.toStatus());
+ }
+ reply.sendCommandReply(request.p(), request.m());
return;
}
}