diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2018-07-12 14:48:46 -0400 |
---|---|---|
committer | Gregory Noma <gregory.noma@gmail.com> | 2018-07-24 13:56:03 -0400 |
commit | 0136f88d1c7d3e49f0e089f826e0b19af45f3b89 (patch) | |
tree | 77306a28d4b4c7946c1158dd127a916ab146cd0f /src/mongo/db/query | |
parent | 9d31d0caa167e9661aaf0f10f260313133bd2a02 (diff) | |
download | mongo-0136f88d1c7d3e49f0e089f826e0b19af45f3b89.tar.gz |
SERVER-36020 Redesign CursorResponseBuilder to allow usage of DocumentSequence
Co-authored-by: Anthony Roy <anthony.roy@10gen.com>
Diffstat (limited to 'src/mongo/db/query')
-rw-r--r-- | src/mongo/db/query/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.h | 39 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response_test.cpp | 28 |
4 files changed, 91 insertions, 24 deletions
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 7247f723a02..1605b395c7d 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -147,6 +147,7 @@ env.Library( '$BUILD_DIR/mongo/db/namespace_string', '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/rpc/command_status', + '$BUILD_DIR/mongo/rpc/rpc', 'query_request', ] ) @@ -164,6 +165,7 @@ env.CppUnitTest( ], LIBDEPS=[ "$BUILD_DIR/mongo/db/pipeline/aggregation_request", + '$BUILD_DIR/mongo/rpc/rpc', 'command_request_response', ] ) diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index e449aad0814..14b8af5e8f6 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -44,34 +44,52 @@ const char kIdField[] = "id"; const char kNsField[] = "ns"; const char kBatchField[] = "nextBatch"; const char kBatchFieldInitial[] = "firstBatch"; +const char kBatchDocSequenceField[] = "cursor.nextBatch"; +const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch"; const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp"; } // namespace -CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse, - BSONObjBuilder* commandResponse) - : _responseInitialLen(commandResponse->bb().len()), - _commandResponse(commandResponse), - _cursorObject(commandResponse->subobjStart(kCursorField)), - _batch(_cursorObject.subarrayStart(isInitialResponse ? kBatchFieldInitial : kBatchField)) {} +CursorResponseBuilder::CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder, + Options options = Options()) + : _options(options), _replyBuilder(replyBuilder) { + if (_options.useDocumentSequences) { + _docSeqBuilder.emplace(_replyBuilder->getDocSequenceBuilder( + _options.isInitialResponse ? kBatchDocSequenceFieldInitial : kBatchDocSequenceField)); + } else { + _bodyBuilder.emplace(_replyBuilder->getBodyBuilder()); + _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField)); + _batch.emplace(_cursorObject->subarrayStart(_options.isInitialResponse ? kBatchFieldInitial + : kBatchField)); + } +} void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) { invariant(_active); - _batch.doneFast(); - _cursorObject.append(kIdField, cursorId); - _cursorObject.append(kNsField, cursorNamespace); - _cursorObject.doneFast(); + if (_options.useDocumentSequences) { + _docSeqBuilder.reset(); + _bodyBuilder.emplace(_replyBuilder->getBodyBuilder()); + _cursorObject.emplace(_bodyBuilder->subobjStart(kCursorField)); + } else { + _batch.reset(); + } + _cursorObject->append(kIdField, cursorId); + _cursorObject->append(kNsField, cursorNamespace); + _cursorObject.reset(); + if (!_latestOplogTimestamp.isNull()) { - _commandResponse->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp); + _bodyBuilder->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp); } + _bodyBuilder.reset(); _active = false; } void CursorResponseBuilder::abandon() { invariant(_active); - _batch.doneFast(); - _cursorObject.doneFast(); - _commandResponse->bb().setlen(_responseInitialLen); // Removes everything we've added. + _batch.reset(); + _cursorObject.reset(); + _bodyBuilder.reset(); + _replyBuilder->reset(); _numDocs = 0; _active = false; } diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index 091b64e0c73..529654118df 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -35,6 +35,8 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/clientcursor.h" #include "mongo/db/namespace_string.h" +#include "mongo/rpc/op_msg.h" +#include "mongo/rpc/reply_builder_interface.h" namespace mongo { @@ -47,14 +49,23 @@ class CursorResponseBuilder { public: /** - * Once constructed, you may not use the passed-in BSONObjBuilder until you call either done() + * Structure used to confiugre the CursorResponseBuilder. + */ + struct Options { + bool isInitialResponse = false; + bool useDocumentSequences = false; + }; + + /** + * Once constructed, you may not use the passed-in ReplyBuilderInterface until you call either + * done() * or abandon(), or this object goes out of scope. This is the same as the rule when using a * BSONObjBuilder to build a sub-object with subobjStart(). * - * If the builder goes out of scope without a call to done(), any data appended to the - * builder will be removed. + * If the builder goes out of scope without a call to done(), the ReplyBuilderInterface will be + * reset. */ - CursorResponseBuilder(bool isInitialResponse, BSONObjBuilder* commandResponse); + CursorResponseBuilder(rpc::ReplyBuilderInterface* replyBuilder, Options options); ~CursorResponseBuilder() { if (_active) @@ -63,12 +74,16 @@ public: size_t bytesUsed() const { invariant(_active); - return _batch.len(); + return _options.useDocumentSequences ? _docSeqBuilder->len() : _batch->len(); } void append(const BSONObj& obj) { invariant(_active); - _batch.append(obj); + if (_options.useDocumentSequences) { + _docSeqBuilder->append(obj); + } else { + _batch->append(obj); + } _numDocs++; } @@ -94,11 +109,15 @@ public: void abandon(); private: - const int _responseInitialLen; // Must be the first member so its initializer runs first. + const Options _options; + rpc::ReplyBuilderInterface* const _replyBuilder; + // Order here is important to ensure destruction in the correct order. + boost::optional<BSONObjBuilder> _bodyBuilder; + boost::optional<BSONObjBuilder> _cursorObject; + boost::optional<BSONArrayBuilder> _batch; + boost::optional<OpMsgBuilder::DocSequenceBuilder> _docSeqBuilder; + bool _active = true; - BSONObjBuilder* const _commandResponse; - BSONObjBuilder _cursorObject; - BSONArrayBuilder _batch; long long _numDocs = 0; Timestamp _latestOplogTimestamp; }; diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index eba5d6748fe..cfe3c8ac3cf 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -30,6 +30,8 @@ #include "mongo/db/query/cursor_response.h" +#include "mongo/rpc/op_msg_rpc_impls.h" + #include "mongo/unittest/unittest.h" namespace mongo { @@ -330,6 +332,32 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) { ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2)); } +TEST(CursorResponseTest, cursorReturnDocumentSequences) { + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + options.useDocumentSequences = true; + rpc::OpMsgReplyBuilder builder; + BSONObj expectedDoc = BSON("_id" << 1 << "test" + << "123"); + BSONObj expectedBody = BSON("cursor" << BSON("id" << CursorId(123) << "ns" + << "db.coll")); + + CursorResponseBuilder crb(&builder, options); + crb.append(expectedDoc); + ASSERT_EQ(crb.numDocs(), 1U); + crb.done(CursorId(123), "db.coll"); + + auto msg = builder.done(); + auto opMsg = OpMsg::parse(msg); + const auto& docSeqs = opMsg.sequences; + ASSERT_EQ(docSeqs.size(), 1U); + const auto& documentSequence = docSeqs[0]; + ASSERT_EQ(documentSequence.name, "cursor.firstBatch"); + ASSERT_EQ(documentSequence.objs.size(), 1U); + ASSERT_BSONOBJ_EQ(documentSequence.objs[0], expectedDoc); + ASSERT_BSONOBJ_EQ(opMsg.body, expectedBody); +} + } // namespace } // namespace mongo |