summaryrefslogtreecommitdiff
path: root/src/mongo/db/query
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2018-07-12 14:48:46 -0400
committerGregory Noma <gregory.noma@gmail.com>2018-07-24 13:56:03 -0400
commit0136f88d1c7d3e49f0e089f826e0b19af45f3b89 (patch)
tree77306a28d4b4c7946c1158dd127a916ab146cd0f /src/mongo/db/query
parent9d31d0caa167e9661aaf0f10f260313133bd2a02 (diff)
downloadmongo-0136f88d1c7d3e49f0e089f826e0b19af45f3b89.tar.gz
SERVER-36020 Redesign CursorResponseBuilder to allow usage of DocumentSequence
Co-authored-by: Anthony Roy <anthony.roy@10gen.com>
Diffstat (limited to 'src/mongo/db/query')
-rw-r--r--src/mongo/db/query/SConscript2
-rw-r--r--src/mongo/db/query/cursor_response.cpp46
-rw-r--r--src/mongo/db/query/cursor_response.h39
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp28
4 files changed, 91 insertions, 24 deletions
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 7247f723a02..1605b395c7d 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -147,6 +147,7 @@ env.Library(
'$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/db/repl/optime',
'$BUILD_DIR/mongo/rpc/command_status',
+ '$BUILD_DIR/mongo/rpc/rpc',
'query_request',
]
)
@@ -164,6 +165,7 @@ env.CppUnitTest(
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/pipeline/aggregation_request",
+ '$BUILD_DIR/mongo/rpc/rpc',
'command_request_response',
]
)
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index e449aad0814..14b8af5e8f6 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -44,34 +44,52 @@ const char kIdField[] = "id";
const char kNsField[] = "ns";
const char kBatchField[] = "nextBatch";
const char kBatchFieldInitial[] = "firstBatch";
+const char kBatchDocSequenceField[] = "cursor.nextBatch";
+const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch";
const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp";
} // namespace
-CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse,
- BSONObjBuilder* commandResponse)
- : _responseInitialLen(commandResponse->bb().len()),
- _commandResponse(commandResponse),
- _cursorObject(commandResponse->subobjStart(kCursorField)),
- _batch(_cursorObject.subarrayStart(isInitialResponse ? kBatchFieldInitial : kBatchField)) {}
+CursorResponseBuilder::CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder,
+ Options options = Options())
+ : _options(options), _replyBuilder(replyBuilder) {
+ if (_options.useDocumentSequences) {
+ _docSeqBuilder.emplace(_replyBuilder->getDocSequenceBuilder(
+ _options.isInitialResponse ? kBatchDocSequenceFieldInitial : kBatchDocSequenceField));
+ } else {
+ _bodyBuilder.emplace(_replyBuilder->getBodyBuilder());
+ _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField));
+ _batch.emplace(_cursorObject->subarrayStart(_options.isInitialResponse ? kBatchFieldInitial
+ : kBatchField));
+ }
+}
void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) {
invariant(_active);
- _batch.doneFast();
- _cursorObject.append(kIdField, cursorId);
- _cursorObject.append(kNsField, cursorNamespace);
- _cursorObject.doneFast();
+ if (_options.useDocumentSequences) {
+ _docSeqBuilder.reset();
+ _bodyBuilder.emplace(_replyBuilder->getBodyBuilder());
+ _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField));
+ } else {
+ _batch.reset();
+ }
+ _cursorObject->append(kIdField, cursorId);
+ _cursorObject->append(kNsField, cursorNamespace);
+ _cursorObject.reset();
+
if (!_latestOplogTimestamp.isNull()) {
- _commandResponse->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp);
+ _bodyBuilder->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp);
}
+ _bodyBuilder.reset();
_active = false;
}
void CursorResponseBuilder::abandon() {
invariant(_active);
- _batch.doneFast();
- _cursorObject.doneFast();
- _commandResponse->bb().setlen(_responseInitialLen); // Removes everything we've added.
+ _batch.reset();
+ _cursorObject.reset();
+ _bodyBuilder.reset();
+ _replyBuilder->reset();
_numDocs = 0;
_active = false;
}
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index 091b64e0c73..529654118df 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -35,6 +35,8 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/rpc/op_msg.h"
+#include "mongo/rpc/reply_builder_interface.h"
namespace mongo {
@@ -47,14 +49,23 @@ class CursorResponseBuilder {
public:
/**
- * Once constructed, you may not use the passed-in BSONObjBuilder until you call either done()
+ * Structure used to confiugre the CursorResponseBuilder.
+ */
+ struct Options {
+ bool isInitialResponse = false;
+ bool useDocumentSequences = false;
+ };
+
+ /**
+ * Once constructed, you may not use the passed-in ReplyBuilderInterface until you call either
+ * done()
* or abandon(), or this object goes out of scope. This is the same as the rule when using a
* BSONObjBuilder to build a sub-object with subobjStart().
*
- * If the builder goes out of scope without a call to done(), any data appended to the
- * builder will be removed.
+ * If the builder goes out of scope without a call to done(), the ReplyBuilderInterface will be
+ * reset.
*/
- CursorResponseBuilder(bool isInitialResponse, BSONObjBuilder* commandResponse);
+ CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder, Options options);
~CursorResponseBuilder() {
if (_active)
@@ -63,12 +74,16 @@ public:
size_t bytesUsed() const {
invariant(_active);
- return _batch.len();
+ return _options.useDocumentSequences ? _docSeqBuilder->len() : _batch->len();
}
void append(const BSONObj& obj) {
invariant(_active);
- _batch.append(obj);
+ if (_options.useDocumentSequences) {
+ _docSeqBuilder->append(obj);
+ } else {
+ _batch->append(obj);
+ }
_numDocs++;
}
@@ -94,11 +109,15 @@ public:
void abandon();
private:
- const int _responseInitialLen; // Must be the first member so its initializer runs first.
+ const Options _options;
+ rpc::ReplyBuilderInterface* const _replyBuilder;
+ // Order here is important to ensure destruction in the correct order.
+ boost::optional<BSONObjBuilder> _bodyBuilder;
+ boost::optional<BSONObjBuilder> _cursorObject;
+ boost::optional<BSONArrayBuilder> _batch;
+ boost::optional<OpMsgBuilder::DocSequenceBuilder> _docSeqBuilder;
+
bool _active = true;
- BSONObjBuilder* const _commandResponse;
- BSONObjBuilder _cursorObject;
- BSONArrayBuilder _batch;
long long _numDocs = 0;
Timestamp _latestOplogTimestamp;
};
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index eba5d6748fe..cfe3c8ac3cf 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -30,6 +30,8 @@
#include "mongo/db/query/cursor_response.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
+
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -330,6 +332,32 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) {
ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2));
}
+TEST(CursorResponseTest, cursorReturnDocumentSequences) {
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+ options.useDocumentSequences = true;
+ rpc::OpMsgReplyBuilder builder;
+ BSONObj expectedDoc = BSON("_id" << 1 << "test"
+ << "123");
+ BSONObj expectedBody = BSON("cursor" << BSON("id" << CursorId(123) << "ns"
+ << "db.coll"));
+
+ CursorResponseBuilder crb(&builder, options);
+ crb.append(expectedDoc);
+ ASSERT_EQ(crb.numDocs(), 1U);
+ crb.done(CursorId(123), "db.coll");
+
+ auto msg = builder.done();
+ auto opMsg = OpMsg::parse(msg);
+ const auto& docSeqs = opMsg.sequences;
+ ASSERT_EQ(docSeqs.size(), 1U);
+ const auto& documentSequence = docSeqs[0];
+ ASSERT_EQ(documentSequence.name, "cursor.firstBatch");
+ ASSERT_EQ(documentSequence.objs.size(), 1U);
+ ASSERT_BSONOBJ_EQ(documentSequence.objs[0], expectedDoc);
+ ASSERT_BSONOBJ_EQ(opMsg.body, expectedBody);
+}
+
} // namespace
} // namespace mongo