diff options
author | Mathias Stearn <mathias@10gen.com> | 2017-07-10 14:37:02 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2017-07-26 15:13:34 -0400 |
commit | 516e0a50a0fb0e9df45cdc5f74cf0308c3706366 (patch) | |
tree | a0c4b66760e834b468eef9a088a4a00e9d2fd23a /src/mongo | |
parent | e6a7b02b3d14d78017b809a125f6bc3633687393 (diff) | |
download | mongo-516e0a50a0fb0e9df45cdc5f74cf0308c3706366.tar.gz |
SERVER-28509 Move code to assemble legacy requests out of DBClient
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/client/dbclient.cpp | 77 | ||||
-rw-r--r-- | src/mongo/client/dbclientcursor.cpp | 15 | ||||
-rw-r--r-- | src/mongo/client/dbclientinterface.h | 99 | ||||
-rw-r--r-- | src/mongo/db/dbmessage.cpp | 71 | ||||
-rw-r--r-- | src/mongo/db/dbmessage.h | 115 | ||||
-rw-r--r-- | src/mongo/db/ops/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_parsers_test.cpp | 84 |
7 files changed, 222 insertions, 240 deletions
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index f418ffd968b..e9de4469a2d 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -31,6 +31,8 @@ #include "mongo/platform/basic.h" +#include "mongo/client/dbclientinterface.h" + #include <algorithm> #include <utility> @@ -40,7 +42,6 @@ #include "mongo/client/authenticate.h" #include "mongo/client/constants.h" #include "mongo/client/dbclientcursor.h" -#include "mongo/client/dbclientinterface.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/config.h" #include "mongo/db/auth/internal_user_auth.h" @@ -1062,55 +1063,19 @@ unsigned long long DBClientConnection::query(stdx::function<void(DBClientCursorB } void DBClientBase::insert(const string& ns, BSONObj obj, int flags) { - BufBuilder b; - - int reservedFlags = 0; - if (flags & InsertOption_ContinueOnError) - reservedFlags |= Reserved_InsertOption_ContinueOnError; - - b.appendNum(reservedFlags); - b.appendStr(ns); - obj.appendSelfToBufBuilder(b); - - Message toSend; - toSend.setData(dbInsert, b.buf(), b.len()); - - say(toSend); + auto msg = makeInsertMessage(ns, obj, flags); + say(msg); } // TODO: Merge with other insert implementation? void DBClientBase::insert(const string& ns, const vector<BSONObj>& v, int flags) { - BufBuilder b; - - int reservedFlags = 0; - if (flags & InsertOption_ContinueOnError) - reservedFlags |= Reserved_InsertOption_ContinueOnError; - - b.appendNum(reservedFlags); - b.appendStr(ns); - for (vector<BSONObj>::const_iterator i = v.begin(); i != v.end(); ++i) - i->appendSelfToBufBuilder(b); - - Message toSend; - toSend.setData(dbInsert, b.buf(), b.len()); - - say(toSend); + auto msg = makeInsertMessage(ns, v.data(), v.size(), flags); + say(msg); } void DBClientBase::remove(const string& ns, Query obj, int flags) { - BufBuilder b; - - const int reservedFlags = 0; - b.appendNum(reservedFlags); - b.appendStr(ns); - b.appendNum(flags); - - obj.obj.appendSelfToBufBuilder(b); - - Message toSend; - toSend.setData(dbDelete, b.buf(), b.len()); - - say(toSend); + auto msg = makeRemoveMessage(ns, obj.obj, flags); + say(msg); } void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upsert, bool multi) { @@ -1123,31 +1088,13 @@ void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upser } void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags) { - BufBuilder b; - - const int reservedFlags = 0; - b.appendNum(reservedFlags); - b.appendStr(ns); - b.appendNum(flags); - - query.obj.appendSelfToBufBuilder(b); - obj.appendSelfToBufBuilder(b); - - Message toSend; - toSend.setData(dbUpdate, b.buf(), b.len()); - - say(toSend); + auto msg = makeUpdateMessage(ns, query.obj, obj, flags); + say(msg); } void DBClientBase::killCursor(long long cursorId) { - StackBufBuilder b; - b.appendNum((int)0); // reserved - b.appendNum((int)1); // number - b.appendNum(cursorId); - - Message m; - m.setData(dbKillCursors, b.buf(), b.len()); - say(m); + auto msg = makeKillCursorsMessage(cursorId); + say(msg); } list<BSONObj> DBClientWithCommands::getIndexSpecs(const string& ns, int options) { diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index 2290228297c..3af409de4ad 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -110,12 +110,7 @@ void DBClientCursor::_assembleInit(Message& toSend) { return; } // Assemble a legacy getMore request. - BufBuilder b; - b.appendNum(opts); - b.appendStr(ns); - b.appendNum(nToReturn); - b.appendNum(cursorId); - toSend.setData(dbGetMore, b.buf(), b.len()); + toSend = makeGetMoreMessage(ns, cursorId, nToReturn, opts); } bool DBClientCursor::init() { @@ -174,14 +169,8 @@ void DBClientCursor::requestMore() { nToReturn -= batch.objs.size(); verify(nToReturn > 0); } - BufBuilder b; - b.appendNum(opts); - b.appendStr(ns); - b.appendNum(nextBatchSize()); - b.appendNum(cursorId); - Message toSend; - toSend.setData(dbGetMore, b.buf(), b.len()); + Message toSend = makeGetMoreMessage(ns, cursorId, nextBatchSize(), opts); Message response; if (_client) { diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h index 0e3a4d98058..1b16532baac 100644 --- a/src/mongo/client/dbclientinterface.h +++ b/src/mongo/client/dbclientinterface.h @@ -36,6 +36,7 @@ #include "mongo/client/mongo_uri.h" #include "mongo/client/query.h" #include "mongo/client/read_preference.h" +#include "mongo/db/dbmessage.h" #include "mongo/db/jsobj.h" #include "mongo/db/write_concern_options.h" #include "mongo/platform/atomic_word.h" @@ -55,104 +56,6 @@ namespace executor { struct RemoteCommandResponse; } -/** the query field 'options' can have these bits set: */ -enum QueryOptions { - /** Tailable means cursor is not closed when the last data is retrieved. rather, the cursor - * marks the final object's position. you can resume using the cursor later, from where it was - located, if more data were received. Set on dbQuery and dbGetMore. - - like any "latent cursor", the cursor may become invalid at some point -- for example if that - final object it references were deleted. Thus, you should be prepared to requery if you get - back ResultFlag_CursorNotFound. - */ - QueryOption_CursorTailable = 1 << 1, - - /** allow query of replica slave. normally these return an error except for namespace "local". - */ - QueryOption_SlaveOk = 1 << 2, - - // findingStart mode is used to find the first operation of interest when - // we are scanning through a repl log. For efficiency in the common case, - // where the first operation of interest is closer to the tail than the head, - // we start from the tail of the log and work backwards until we find the - // first operation of interest. Then we scan forward from that first operation, - // actually returning results to the client. During the findingStart phase, - // we release the db mutex occasionally to avoid blocking the db process for - // an extended period of time. - QueryOption_OplogReplay = 1 << 3, - - /** The server normally times out idle cursors after an inactivity period to prevent excess - * memory uses - Set this option to prevent that. - */ - QueryOption_NoCursorTimeout = 1 << 4, - - /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while - * rather than returning no data. After a timeout period, we do return as normal. - */ - QueryOption_AwaitData = 1 << 5, - - /** Stream the data down full blast in multiple "more" packages, on the assumption that the - * client will fully read all data queried. Faster when you are pulling a lot of data and know - * you want to pull it all down. Note: it is not allowed to not read all the data unless you - * close the connection. - - Use the query( stdx::function<void(const BSONObj&)> f, ... ) version of the connection's - query() - method, and it will take care of all the details for you. - */ - QueryOption_Exhaust = 1 << 6, - - /** When sharded, this means its ok to return partial results - Usually we will fail a query if all required shards aren't up - If this is set, it'll be a partial result set - */ - QueryOption_PartialResults = 1 << 7, - - QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | - QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | - QueryOption_Exhaust | QueryOption_PartialResults, - - QueryOption_AllSupportedForSharding = QueryOption_CursorTailable | QueryOption_SlaveOk | - QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | - QueryOption_PartialResults, -}; - -enum UpdateOptions { - /** Upsert - that is, insert the item if no matching item is found. */ - UpdateOption_Upsert = 1 << 0, - - /** Update multiple documents (if multiple documents match query expression). - (Default is update a single document and stop.) */ - UpdateOption_Multi = 1 << 1, - - /** flag from mongo saying this update went everywhere */ - UpdateOption_Broadcast = 1 << 2 -}; - -enum RemoveOptions { - /** only delete one option */ - RemoveOption_JustOne = 1 << 0, - - /** flag from mongo saying this update went everywhere */ - RemoveOption_Broadcast = 1 << 1 -}; - -enum InsertOptions { - /** With muli-insert keep processing inserts if one fails */ - InsertOption_ContinueOnError = 1 << 0 -}; - -// -// For legacy reasons, the reserved field pre-namespace of certain types of messages is used -// to store options as opposed to the flags after the namespace. This should be transparent to -// the api user, but we need these constants to disassemble/reassemble the messages correctly. -// - -enum ReservedOptions { - Reserved_InsertOption_ContinueOnError = 1 << 0, -}; - class DBClientCursor; class DBClientCursorBatchIterator; diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp index 563e3001e07..4b9a0ccffc7 100644 --- a/src/mongo/db/dbmessage.cpp +++ b/src/mongo/db/dbmessage.cpp @@ -139,6 +139,77 @@ T DbMessage::readAndAdvance() { return t; } +namespace { +template <typename Func> +Message makeMessage(NetworkOp op, Func&& bodyBuilder) { + BufBuilder b; + b.skip(sizeof(MSGHEADER::Layout)); + + bodyBuilder(b); + + const int size = b.len(); + auto out = Message(b.release()); + out.header().setOperation(op); + out.header().setLen(size); + return out; +} +} + +Message makeInsertMessage(StringData ns, const BSONObj* objs, size_t count, int flags) { + return makeMessage(dbInsert, [&](BufBuilder& b) { + int reservedFlags = 0; + if (flags & InsertOption_ContinueOnError) + reservedFlags |= InsertOption_ContinueOnError; + + b.appendNum(reservedFlags); + b.appendStr(ns); + + for (size_t i = 0; i < count; i++) { + objs[i].appendSelfToBufBuilder(b); + } + }); +} + +Message makeUpdateMessage(StringData ns, BSONObj query, BSONObj update, int flags) { + return makeMessage(dbUpdate, [&](BufBuilder& b) { + const int reservedFlags = 0; + b.appendNum(reservedFlags); + b.appendStr(ns); + b.appendNum(flags); + + query.appendSelfToBufBuilder(b); + update.appendSelfToBufBuilder(b); + }); +} + +Message makeRemoveMessage(StringData ns, BSONObj query, int flags) { + return makeMessage(dbDelete, [&](BufBuilder& b) { + const int reservedFlags = 0; + b.appendNum(reservedFlags); + b.appendStr(ns); + b.appendNum(flags); + + query.appendSelfToBufBuilder(b); + }); +} + +Message makeKillCursorsMessage(long long cursorId) { + return makeMessage(dbKillCursors, [&](BufBuilder& b) { + b.appendNum((int)0); // reserved + b.appendNum((int)1); // number + b.appendNum(cursorId); + }); +} + +Message makeGetMoreMessage(StringData ns, long long cursorId, int nToReturn, int flags) { + return makeMessage(dbGetMore, [&](BufBuilder& b) { + b.appendNum(flags); + b.appendStr(ns); + b.appendNum(nToReturn); + b.appendNum(cursorId); + }); +} + OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) { _buffer.skip(sizeof(QueryResult::Value)); } diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 2b7f4517c72..61dc3429303 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -286,6 +286,68 @@ private: unsigned int _nsLen; }; +/** the query field 'options' can have these bits set: */ +enum QueryOptions { + /** Tailable means cursor is not closed when the last data is retrieved. rather, the cursor + * marks the final object's position. you can resume using the cursor later, from where it was + located, if more data were received. Set on dbQuery and dbGetMore. + + like any "latent cursor", the cursor may become invalid at some point -- for example if that + final object it references were deleted. Thus, you should be prepared to requery if you get + back ResultFlag_CursorNotFound. + */ + QueryOption_CursorTailable = 1 << 1, + + /** allow query of replica slave. normally these return an error except for namespace "local". + */ + QueryOption_SlaveOk = 1 << 2, + + // findingStart mode is used to find the first operation of interest when + // we are scanning through a repl log. For efficiency in the common case, + // where the first operation of interest is closer to the tail than the head, + // we start from the tail of the log and work backwards until we find the + // first operation of interest. Then we scan forward from that first operation, + // actually returning results to the client. During the findingStart phase, + // we release the db mutex occasionally to avoid blocking the db process for + // an extended period of time. + QueryOption_OplogReplay = 1 << 3, + + /** The server normally times out idle cursors after an inactivity period to prevent excess + * memory uses + Set this option to prevent that. + */ + QueryOption_NoCursorTimeout = 1 << 4, + + /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while + * rather than returning no data. After a timeout period, we do return as normal. + */ + QueryOption_AwaitData = 1 << 5, + + /** Stream the data down full blast in multiple "more" packages, on the assumption that the + * client will fully read all data queried. Faster when you are pulling a lot of data and know + * you want to pull it all down. Note: it is not allowed to not read all the data unless you + * close the connection. + + Use the query( stdx::function<void(const BSONObj&)> f, ... ) version of the connection's + query() + method, and it will take care of all the details for you. + */ + QueryOption_Exhaust = 1 << 6, + + /** When sharded, this means its ok to return partial results + Usually we will fail a query if all required shards aren't up + If this is set, it'll be a partial result set + */ + QueryOption_PartialResults = 1 << 7, + + QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | + QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | + QueryOption_Exhaust | QueryOption_PartialResults, + + QueryOption_AllSupportedForSharding = QueryOption_CursorTailable | QueryOption_SlaveOk | + QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | + QueryOption_PartialResults, +}; /* a request to run a query, received from the database */ class QueryMessage { @@ -321,6 +383,59 @@ public: } }; +enum InsertOptions { + /** With muli-insert keep processing inserts if one fails */ + InsertOption_ContinueOnError = 1 << 0 +}; + +/** + * Builds a legacy OP_INSERT message. + */ +Message makeInsertMessage(StringData ns, const BSONObj* objs, size_t count, int flags = 0); +inline Message makeInsertMessage(StringData ns, const BSONObj& obj, int flags = 0) { + return makeInsertMessage(ns, &obj, 1, flags); +} + +enum UpdateOptions { + /** Upsert - that is, insert the item if no matching item is found. */ + UpdateOption_Upsert = 1 << 0, + + /** Update multiple documents (if multiple documents match query expression). + (Default is update a single document and stop.) */ + UpdateOption_Multi = 1 << 1, + + /** flag from mongo saying this update went everywhere */ + UpdateOption_Broadcast = 1 << 2 +}; + +/** + * Builds a legacy OP_UPDATE message. + */ +Message makeUpdateMessage(StringData ns, BSONObj query, BSONObj update, int flags = 0); + +enum RemoveOptions { + /** only delete one option */ + RemoveOption_JustOne = 1 << 0, + + /** flag from mongo saying this update went everywhere */ + RemoveOption_Broadcast = 1 << 1 +}; + +/** + * Builds a legacy OP_REMOVE message. + */ +Message makeRemoveMessage(StringData ns, BSONObj query, int flags = 0); + +/** + * Builds a legacy OP_KILLCURSORS message. + */ +Message makeKillCursorsMessage(long long cursorId); + +/** + * Builds a legacy OP_GETMORE message. + */ +Message makeGetMoreMessage(StringData ns, long long cursorId, int nToReturn, int flags = 0); + /** * A response to a DbMessage. * diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index 554c9b6252b..3566f93bf11 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -181,7 +181,6 @@ env.CppUnitTest( LIBDEPS=[ 'write_ops_parsers', 'write_ops_parsers_test_helpers', - '$BUILD_DIR/mongo/client/clientdriver', ], ) diff --git a/src/mongo/db/ops/write_ops_parsers_test.cpp b/src/mongo/db/ops/write_ops_parsers_test.cpp index 0f6b98a5263..0f461e9a28e 100644 --- a/src/mongo/db/ops/write_ops_parsers_test.cpp +++ b/src/mongo/db/ops/write_ops_parsers_test.cpp @@ -28,9 +28,10 @@ #include "mongo/platform/basic.h" -#include "mongo/client/dbclientinterface.h" #include "mongo/db/catalog/document_validation.h" +#include "mongo/db/dbmessage.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/ops/write_ops_parsers_test_helpers.h" #include "mongo/unittest/unittest.h" @@ -316,59 +317,13 @@ TEST(CommandWriteOpsParsers, RemoveErrorsWithBadLimit) { } } -namespace { -/** - * A mock DBClient that just captures the Message that is sent for legacy writes. - */ -class MyMockDBClient final : public DBClientBase { -public: - Message message; // The last message sent. - - void say(Message& toSend, bool isRetry = false, std::string* actualServer = nullptr) { - message = std::move(toSend); - } - - // The rest of these are just filling out the pure-virtual parts of the interface. - bool lazySupported() const { - return false; - } - std::string getServerAddress() const { - return ""; - } - std::string toString() const { - return ""; - } - bool call(Message& toSend, Message& response, bool assertOk, std::string* actualServer) { - invariant(!"call() not implemented"); - } - virtual int getMinWireVersion() { - return 0; - } - virtual int getMaxWireVersion() { - return 0; - } - virtual bool isFailed() const { - return false; - } - virtual bool isStillConnected() { - return true; - } - virtual double getSoTimeout() const { - return 0; - } - virtual ConnectionString::ConnectionType type() const { - return ConnectionString::MASTER; - } -}; -} // namespace - TEST(LegacyWriteOpsParsers, SingleInsert) { const std::string ns = "test.foo"; const BSONObj obj = BSON("x" << 1); for (bool continueOnError : {false, true}) { - MyMockDBClient client; - client.insert(ns, obj, continueOnError ? InsertOption_ContinueOnError : 0); - const auto op = InsertOp::parseLegacy(client.message); + auto message = + makeInsertMessage(ns, obj, continueOnError ? InsertOption_ContinueOnError : 0); + const auto op = InsertOp::parseLegacy(message); ASSERT_EQ(op.getNamespace().ns(), ns); ASSERT(!op.getWriteCommandBase().getBypassDocumentValidation()); ASSERT_EQ(!op.getWriteCommandBase().getOrdered(), continueOnError); @@ -380,11 +335,11 @@ TEST(LegacyWriteOpsParsers, SingleInsert) { TEST(LegacyWriteOpsParsers, EmptyMultiInsertFails) { const std::string ns = "test.foo"; for (bool continueOnError : {false, true}) { - MyMockDBClient client; - client.insert( - ns, std::vector<BSONObj>{}, continueOnError ? InsertOption_ContinueOnError : 0); + auto objs = std::vector<BSONObj>{}; + auto message = makeInsertMessage( + ns, objs.data(), objs.size(), (continueOnError ? InsertOption_ContinueOnError : 0)); ASSERT_THROWS_CODE( - InsertOp::parseLegacy(client.message), UserException, ErrorCodes::InvalidLength); + InsertOp::parseLegacy(message), UserException, ErrorCodes::InvalidLength); } } @@ -393,9 +348,10 @@ TEST(LegacyWriteOpsParsers, RealMultiInsert) { const BSONObj obj0 = BSON("x" << 0); const BSONObj obj1 = BSON("x" << 1); for (bool continueOnError : {false, true}) { - MyMockDBClient client; - client.insert(ns, {obj0, obj1}, continueOnError ? InsertOption_ContinueOnError : 0); - const auto op = InsertOp::parseLegacy(client.message); + auto objs = std::vector<BSONObj>{obj0, obj1}; + auto message = makeInsertMessage( + ns, objs.data(), objs.size(), continueOnError ? InsertOption_ContinueOnError : 0); + const auto op = InsertOp::parseLegacy(message); ASSERT_EQ(op.getNamespace().ns(), ns); ASSERT(!op.getWriteCommandBase().getBypassDocumentValidation()); ASSERT_EQ(!op.getWriteCommandBase().getOrdered(), continueOnError); @@ -411,9 +367,12 @@ TEST(LegacyWriteOpsParsers, Update) { const BSONObj update = BSON("$inc" << BSON("x" << 1)); for (bool upsert : {false, true}) { for (bool multi : {false, true}) { - MyMockDBClient client; - client.update(ns, query, update, upsert, multi); - const auto op = UpdateOp::parseLegacy(client.message); + auto message = makeUpdateMessage(ns, + query, + update, + (upsert ? UpdateOption_Upsert : 0) | + (multi ? UpdateOption_Multi : 0)); + const auto op = UpdateOp::parseLegacy(message); ASSERT_EQ(op.getNamespace().ns(), ns); ASSERT(!op.getWriteCommandBase().getBypassDocumentValidation()); ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true); @@ -430,9 +389,8 @@ TEST(LegacyWriteOpsParsers, Remove) { const std::string ns = "test.foo"; const BSONObj query = BSON("x" << 1); for (bool multi : {false, true}) { - MyMockDBClient client; - client.remove(ns, query, multi ? 0 : RemoveOption_JustOne); - const auto op = DeleteOp::parseLegacy(client.message); + auto message = makeRemoveMessage(ns, query, (multi ? 0 : RemoveOption_JustOne)); + const auto op = DeleteOp::parseLegacy(message); ASSERT_EQ(op.getNamespace().ns(), ns); ASSERT(!op.getWriteCommandBase().getBypassDocumentValidation()); ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true); |