diff options
author | Irina Yatsenko <irina.yatsenko@mongodb.com> | 2021-08-14 23:11:21 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-08-14 23:54:17 +0000 |
commit | 1f64d42977db0572b08d7ab19133bc3f21323ce0 (patch) | |
tree | 6478091409d1eabc3ed836646fdad68bdc075ec6 /src/mongo/client | |
parent | 3e64baa99c9331619de112b3957657313e1d4100 (diff) | |
download | mongo-1f64d42977db0572b08d7ab19133bc3f21323ce0.tar.gz |
SERVER-58670 Modernize DBClientBase query interface to avoid OP_QUERY-derived characteristics
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/async_client.cpp | 21 | ||||
-rw-r--r-- | src/mongo/client/async_client.h | 1 | ||||
-rw-r--r-- | src/mongo/client/dbclient_base.cpp | 144 | ||||
-rw-r--r-- | src/mongo/client/dbclient_base.h | 639 | ||||
-rw-r--r-- | src/mongo/client/dbclient_connection.cpp | 17 | ||||
-rw-r--r-- | src/mongo/client/dbclient_connection.h | 9 | ||||
-rw-r--r-- | src/mongo/client/dbclient_connection_integration_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor.cpp | 52 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor.h | 16 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.cpp | 131 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.h | 36 |
12 files changed, 361 insertions, 723 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index 5e16b4feb48..6b3085efd4d 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -41,13 +41,13 @@ #include "mongo/config.h" #include "mongo/db/auth/sasl_command_constants.h" #include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/dbmessage.h" #include "mongo/db/server_options.h" #include "mongo/db/wire_version.h" #include "mongo/executor/egress_tag_closer_manager.h" #include "mongo/logv2/log.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/rpc/legacy_request_builder.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/reply_interface.h" #include "mongo/util/fail_point.h" @@ -142,8 +142,9 @@ void AsyncDBClient::_parseIsMasterResponse(BSONObj request, auto clientProtocols = rpc::computeProtocolSet(wireSpec->outgoing); invariant(clientProtocols != rpc::supports::kNone); - // Set the operation protocol - _negotiatedProtocol = uassertStatusOK(rpc::negotiate(protocolSet.protocolSet, clientProtocols)); + boost::optional<rpc::Protocol> protocol = + uassertStatusOK(rpc::negotiate(protocolSet.protocolSet, clientProtocols)); + invariant(protocol && *protocol == rpc::Protocol::kOpMsg); _compressorManager.clientFinish(responseBody); } @@ -238,8 +239,12 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName, auto requestObj = _buildIsMasterRequest(appName, hook); // We use a legacy request to create our ismaster request because we may // have to communicate with servers that do not support other protocols. - auto requestMsg = - rpc::legacyRequestFromOpMsgRequest(OpMsgRequest::fromDBAndBody("admin", requestObj)); + auto requestMsg = makeDeprecatedQueryMessage("admin.$cmd", + requestObj, + 1 /*nToReturn*/, + 0 /*nToSkip*/, + nullptr /*fieldsToReturn*/, + 0 /*queryOptions*/); auto msgId = nextMessageId(); return _call(requestMsg, msgId) .then([msgId, this]() { return _waitForResponse(msgId); }) @@ -291,8 +296,7 @@ Future<Message> AsyncDBClient::_waitForResponse(boost::optional<int32_t> msgId, Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, const BatonHandle& baton, bool fireAndForget) { - invariant(_negotiatedProtocol); - auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); + auto requestMsg = request.serialize(); if (fireAndForget) { OpMsg::setFlag(&requestMsg, OpMsg::kMoreToCome); } @@ -351,8 +355,7 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::awaitExhaustCommand( Future<executor::RemoteCommandResponse> AsyncDBClient::runExhaustCommand(OpMsgRequest request, const BatonHandle& baton) { - invariant(_negotiatedProtocol); - auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); + auto requestMsg = request.serialize(); OpMsg::setFlag(&requestMsg, OpMsg::kExhaustSupported); auto msgId = nextMessageId(); diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index d7a2402f11a..ce88edb7228 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -115,7 +115,6 @@ private: transport::SessionHandle _session; ServiceContext* const _svcCtx; MessageCompressorManager _compressorManager; - boost::optional<rpc::Protocol> _negotiatedProtocol; }; } // namespace mongo diff --git a/src/mongo/client/dbclient_base.cpp b/src/mongo/client/dbclient_base.cpp index 27944bbd390..c0018211757 100644 --- a/src/mongo/client/dbclient_base.cpp +++ b/src/mongo/client/dbclient_base.cpp @@ -71,7 +71,6 @@ #include "mongo/util/debug_util.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/net/ssl_options.h" -#include "mongo/util/password_digest.h" namespace mongo { @@ -252,8 +251,10 @@ std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget( auto opCtx = haveClient() ? cc().getOperationContext() : nullptr; appendMetadata(opCtx, _metadataWriter, _apiParameters, request); - auto requestMsg = - rpc::messageFromOpMsgRequest(getClientRPCProtocols(), getServerRPCProtocols(), request); + rpc::Protocol protocol = + uassertStatusOK(rpc::negotiate(getClientRPCProtocols(), getServerRPCProtocols())); + invariant(protocol == rpc::Protocol::kOpMsg); + auto requestMsg = request.serialize(); Message replyMsg; @@ -315,51 +316,6 @@ bool DBClientBase::runCommand(const string& dbname, BSONObj cmd, BSONObj& info, return std::get<0>(res); } - -/* note - we build a bson obj here -- for something that is super common like getlasterror you - should have that object prebuilt as that would be faster. -*/ -bool DBClientBase::simpleCommand(const string& dbname, BSONObj* info, const string& command) { - BSONObj o; - if (info == nullptr) - info = &o; - BSONObjBuilder b; - b.append(command, 1); - return runCommand(dbname, b.done(), *info); -} - -bool DBClientBase::runPseudoCommand(StringData db, - StringData realCommandName, - StringData pseudoCommandCol, - const BSONObj& cmdArgs, - BSONObj& info, - int options) { - BSONObjBuilder bob; - bob.append(realCommandName, 1); - bob.appendElements(cmdArgs); - auto cmdObj = bob.done(); - - bool success = false; - - if (!(success = runCommand(db.toString(), cmdObj, info, options))) { - auto status = getStatusFromCommandResult(info); - verify(!status.isOK()); - - if (status == ErrorCodes::CommandResultSchemaViolation) { - msgasserted(28624, - str::stream() << "Received bad " << realCommandName - << " response from server: " << info); - } else if (status == ErrorCodes::CommandNotFound) { - NamespaceString pseudoCommandNss(db, pseudoCommandCol); - // if this throws we just let it escape as that's how runCommand works. - info = findOne(pseudoCommandNss.ns(), cmdArgs, nullptr, options); - return true; - } - } - - return success; -} - long long DBClientBase::count(const NamespaceStringOrUUID nsOrUuid, const BSONObj& query, int options, @@ -401,10 +357,6 @@ BSONObj DBClientBase::_countCmd(const NamespaceStringOrUUID nsOrUuid, return b.obj(); } -string DBClientBase::createPasswordDigest(const string& username, const string& clearTextPassword) { - return mongo::createPasswordDigest(username, clearTextPassword); -} - namespace { class ScopedMetadataWriterRemover { ScopedMetadataWriterRemover(const ScopedMetadataWriterRemover&) = delete; @@ -650,23 +602,15 @@ bool DBClientBase::exists(const string& ns) { return !results.empty(); } -/** query N objects from the database into an array. makes sense mostly when you want a small - * number of results. if a huge number, use query() and iterate the cursor. - */ -void DBClientBase::findN(vector<BSONObj>& out, - const string& ns, - Query query, - int limit, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - boost::optional<BSONObj> readConcernObj) { - out.reserve(limit); - +BSONObj DBClientBase::findOne(const string& ns, + const Query& query, + const BSONObj* fieldsToReturn, + int queryOptions, + boost::optional<BSONObj> readConcernObj) { unique_ptr<DBClientCursor> c = this->query(NamespaceString(ns), query, - limit, - nToSkip, + 1 /*limit*/, + 0 /*nToSkip*/, fieldsToReturn, queryOptions, 0 /* batchSize */, @@ -678,23 +622,7 @@ void DBClientBase::findN(vector<BSONObj>& out, << " ns: " << ns << " query: " << query.toString(), c.get()); - tassert(5262100, - "Deprecated ShardConfigStale flag encountered in query result", - !c->hasResultFlag(ResultFlag_ShardConfigStaleDeprecated)); - - while (c->more()) { - out.push_back(c->nextSafe()); - } -} - -BSONObj DBClientBase::findOne(const string& ns, - const Query& query, - const BSONObj* fieldsToReturn, - int queryOptions, - boost::optional<BSONObj> readConcernObj) { - vector<BSONObj> v; - findN(v, ns, query, 1, 0, fieldsToReturn, queryOptions, readConcernObj); - return v.empty() ? BSONObj() : v[0]; + return c->more() ? c->nextSafe() : BSONObj(); } std::pair<BSONObj, NamespaceString> DBClientBase::findOneByUUID( @@ -761,11 +689,9 @@ unique_ptr<DBClientCursor> DBClientBase::query(const NamespaceStringOrUUID& nsOr return nullptr; } -unique_ptr<DBClientCursor> DBClientBase::getMore(const string& ns, - long long cursorId, - int options) { +unique_ptr<DBClientCursor> DBClientBase::getMore(const string& ns, long long cursorId) { unique_ptr<DBClientCursor> c( - new DBClientCursor(this, NamespaceString(ns), cursorId, 0 /* limit */, options)); + new DBClientCursor(this, NamespaceString(ns), cursorId, 0 /* limit */, 0 /* options */)); if (c->init()) return c; return nullptr; @@ -791,7 +717,7 @@ unsigned long long DBClientBase::query(std::function<void(const BSONObj&)> f, fun._f = f; std::function<void(DBClientCursorBatchIterator&)> ptr(fun); return this->query( - ptr, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize, readConcernObj); + ptr, nsOrUuid, std::move(query), fieldsToReturn, queryOptions, batchSize, readConcernObj); } unsigned long long DBClientBase::query(std::function<void(DBClientCursorBatchIterator&)> f, @@ -822,9 +748,8 @@ unsigned long long DBClientBase::query(std::function<void(DBClientCursorBatchIte namespace { OpMsgRequest createInsertRequest(const string& ns, const vector<BSONObj>& v, - int flags, + bool ordered, boost::optional<BSONObj> writeConcernObj) { - bool ordered = !(flags & InsertOption_ContinueOnError); auto nss = NamespaceString(ns); BSONObjBuilder cmdBuilder; cmdBuilder.append("insert", nss.coll()); @@ -861,9 +786,9 @@ OpMsgRequest createUpdateRequest(const string& ns, OpMsgRequest createRemoveRequest(const string& ns, Query obj, - int flags, + bool removeMany, boost::optional<BSONObj> writeConcernObj) { - int limit = (flags & RemoveOption_JustOne) ? 1 : 0; + const int limit = removeMany ? 0 : 1; auto nss = NamespaceString(ns); BSONObjBuilder cmdBuilder; @@ -880,42 +805,42 @@ OpMsgRequest createRemoveRequest(const string& ns, BSONObj DBClientBase::insertAcknowledged(const string& ns, const vector<BSONObj>& v, - int flags, + bool ordered, boost::optional<BSONObj> writeConcernObj) { - OpMsgRequest request = createInsertRequest(ns, v, flags, writeConcernObj); + OpMsgRequest request = createInsertRequest(ns, v, ordered, writeConcernObj); rpc::UniqueReply reply = runCommand(std::move(request)); return reply->getCommandReply(); } void DBClientBase::insert(const string& ns, BSONObj obj, - int flags, + bool ordered, boost::optional<BSONObj> writeConcernObj) { - insert(ns, std::vector<BSONObj>{obj}, flags, writeConcernObj); + insert(ns, std::vector<BSONObj>{obj}, ordered, writeConcernObj); } void DBClientBase::insert(const string& ns, const vector<BSONObj>& v, - int flags, + bool ordered, boost::optional<BSONObj> writeConcernObj) { - auto request = createInsertRequest(ns, v, flags, writeConcernObj); + auto request = createInsertRequest(ns, v, ordered, writeConcernObj); runFireAndForgetCommand(std::move(request)); } BSONObj DBClientBase::removeAcknowledged(const string& ns, Query obj, - int flags, + bool removeMany, boost::optional<BSONObj> writeConcernObj) { - OpMsgRequest request = createRemoveRequest(ns, obj, flags, writeConcernObj); + OpMsgRequest request = createRemoveRequest(ns, obj, removeMany, writeConcernObj); rpc::UniqueReply reply = runCommand(std::move(request)); return reply->getCommandReply(); } void DBClientBase::remove(const string& ns, Query obj, - int flags, + bool removeMany, boost::optional<BSONObj> writeConcernObj) { - auto request = createRemoveRequest(ns, obj, flags, writeConcernObj); + auto request = createRemoveRequest(ns, obj, removeMany, writeConcernObj); runFireAndForgetCommand(std::move(request)); } @@ -940,19 +865,6 @@ void DBClientBase::update(const string& ns, runFireAndForgetCommand(std::move(request)); } -void DBClientBase::update(const string& ns, - Query query, - BSONObj obj, - int flags, - boost::optional<BSONObj> writeConcernObj) { - update(ns, - std::move(query), - std::move(obj), - flags & UpdateOption_Upsert, - flags & UpdateOption_Multi, - writeConcernObj); -} - void DBClientBase::killCursor(const NamespaceString& ns, long long cursorId) { runFireAndForgetCommand(OpMsgRequest::fromDBAndBody( ns.db(), KillCursorsCommandRequest(ns, {cursorId}).toBSON(BSONObj{}))); diff --git a/src/mongo/client/dbclient_base.h b/src/mongo/client/dbclient_base.h index f29037b04c0..56227d92b40 100644 --- a/src/mongo/client/dbclient_base.h +++ b/src/mongo/client/dbclient_base.h @@ -63,53 +63,26 @@ namespace executor { struct RemoteCommandResponse; } -// Useful utilities for namespaces -/** @return the database name portion of an ns std::string */ +/** + * Returns the database name portion of an ns std::string. + */ std::string nsGetDB(const std::string& ns); -/** @return the collection name portion of an ns std::string */ -std::string nsGetCollection(const std::string& ns); - /** - * This class pre-declares all the "query()" methods for DBClient so the subclasses can mark - * them as "final" or "override" as appropriate. + * Returns the collection name portion of an ns std::string. */ -class DBClientQueryInterface { - virtual std::unique_ptr<DBClientCursor> query( - const NamespaceStringOrUUID& nsOrUuid, - Query query, - int limit = 0, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0, - boost::optional<BSONObj> readConcernObj = boost::none) = 0; - - virtual unsigned long long query(std::function<void(const BSONObj&)> f, - const NamespaceStringOrUUID& nsOrUuid, - Query query, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0, - boost::optional<BSONObj> readConcernObj = boost::none) = 0; - - virtual unsigned long long query(std::function<void(DBClientCursorBatchIterator&)> f, - const NamespaceStringOrUUID& nsOrUuid, - Query query, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - int batchSize = 0, - boost::optional<BSONObj> readConcernObj = boost::none) = 0; -}; +std::string nsGetCollection(const std::string& ns); /** - abstract class that implements the core db operations + * Abstract class that implements the core db operations. */ -class DBClientBase : public DBClientQueryInterface { +class DBClientBase { DBClientBase(const DBClientBase&) = delete; DBClientBase& operator=(const DBClientBase&) = delete; public: + static const uint64_t INVALID_SOCK_CREATION_TIME; + DBClientBase(const ClientAPIVersionParameters* apiParameters = nullptr) : _logLevel(logv2::LogSeverity::Log()), _connectionId(ConnectionIdSequence.fetchAndAdd(1)), @@ -122,86 +95,69 @@ public: virtual ~DBClientBase() {} - /** - @return a single object that matches the query. if none do, then the object is empty - @throws AssertionException - */ - virtual BSONObj findOne(const std::string& ns, - const Query& query, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - boost::optional<BSONObj> readConcernObj = boost::none); + virtual std::string toString() const = 0; - /** query N objects from the database into an array. makes sense mostly when you want a small - * number of results. if a huge number, use query() and iterate the cursor. + virtual std::string getServerAddress() const = 0; + + rpc::ProtocolSet getClientRPCProtocols() const; + rpc::ProtocolSet getServerRPCProtocols() const; + + /** + * Reconnect if needed and allowed. */ - void findN(std::vector<BSONObj>& out, - const std::string& ns, - Query query, - int limit, - int nToSkip = 0, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = 0, - boost::optional<BSONObj> readConcernObj = boost::none); + virtual void checkConnection() {} /** - * @return a pair with a single object that matches the filter within the collection specified - * by the UUID and the namespace of that collection on the queried node. - * - * If the command fails, an assertion error is thrown. Otherwise, if no document matches - * the query, an empty BSONObj is returned. - * @throws AssertionException + * If not checked recently, checks whether the underlying socket/sockets are still valid. */ - virtual std::pair<BSONObj, NamespaceString> findOneByUUID( - const std::string& db, - UUID uuid, - const BSONObj& filter, - boost::optional<BSONObj> readConcernObj = boost::none); + virtual bool isStillConnected() = 0; - virtual std::string getServerAddress() const = 0; + long long getConnectionId() const { + return _connectionId; + } - /** helper function. run a simple command where the command expression is simply - { command : 1 } - @param info -- where to put result object. may be null if caller doesn't need that info - @param command -- command name - @return true if the command returned "ok". + /** + * Returns true if this connection is currently in a failed state. */ - bool simpleCommand(const std::string& dbname, BSONObj* info, const std::string& command); + virtual bool isFailed() const = 0; - rpc::ProtocolSet getClientRPCProtocols() const; - rpc::ProtocolSet getServerRPCProtocols() const; + virtual ConnectionString::ConnectionType type() const = 0; + + virtual double getSoTimeout() const = 0; + + virtual uint64_t getSockCreationMicroSec() const { + return INVALID_SOCK_CREATION_TIME; + } + + virtual void reset() {} /** - * actualServer is set to the actual server where they call went if there was a choice (for - * example SecondaryOk). + * Returns true in isPrimary param if this db is the current primary of a replica pair. + * + * Pass in info for more details e.g.: + * { "isprimary" : 1.0 , "msg" : "not paired" , "ok" : 1.0 } + * + * Returns true if command invoked successfully. */ - virtual bool call(Message& toSend, - Message& response, - bool assertOk = true, - std::string* actualServer = nullptr) = 0; + virtual bool isPrimary(bool& isPrimary, BSONObj* info = nullptr); - virtual void say(Message& toSend, - bool isRetry = false, - std::string* actualServer = nullptr) = 0; + virtual bool isReplicaSetMember() const = 0; - /* used by QueryOption_Exhaust. To use that your subclass must implement this. */ - virtual Status recv(Message& m, int lastRequestId) { - verify(false); - return {ErrorCodes::NotImplemented, "recv() not implemented"}; - } + virtual bool isMongos() const = 0; - // In general, for lazy queries, we'll need to say, recv, then checkResponse - virtual void checkResponse(const std::vector<BSONObj>& batch, - bool networkError, - bool* retry = nullptr, - std::string* targetHost = nullptr) { - if (retry) - *retry = false; - if (targetHost) - *targetHost = ""; + virtual int getMinWireVersion() = 0; + virtual int getMaxWireVersion() = 0; + + const std::vector<std::string>& getIsPrimarySaslMechanisms() const { + return _saslMechsForAuth; } - virtual bool lazySupported() const = 0; + /** + * Returns the latest operationTime tracked on this client. + */ + Timestamp getOperationTime(); + + void setOperationTime(Timestamp operationTime); /** * Sets a RequestMetadataWriter on this connection. @@ -211,9 +167,8 @@ public: virtual void setRequestMetadataWriter(rpc::RequestMetadataWriter writer); /** - * Gets the RequestMetadataWriter that is set on this connection. This may - * be an uninitialized std::function, so it should be checked for validity - * with operator bool() first. + * Gets the RequestMetadataWriter that is set on this connection. This may be an uninitialized + * std::function, so it should be checked for validity with operator bool() first. */ const rpc::RequestMetadataWriter& getRequestMetadataWriter(); @@ -225,20 +180,26 @@ public: virtual void setReplyMetadataReader(rpc::ReplyMetadataReader reader); /** - * Gets the ReplyMetadataReader that is set on this connection. This may - * be an uninitialized std::function, so it should be checked for validity - * with operator bool() first. + * Gets the ReplyMetadataReader that is set on this connection. This may be an uninitialized + * std::function, so it should be checked for validity with operator bool() first. */ const rpc::ReplyMetadataReader& getReplyMetadataReader(); /** + * Parses command replies and runs them through the metadata reader. + * This is virtual and non-const to allow subclasses to act on failures. + */ + virtual rpc::UniqueReply parseCommandReplyMessage(const std::string& host, + const Message& replyMsg); + + /** * Runs the specified command request. */ virtual std::pair<rpc::UniqueReply, DBClientBase*> runCommandWithTarget(OpMsgRequest request); /** * This shared_ptr overload is used to possibly return a shared_ptr to the replica set member - * that the command was dispatched to. It's needed if the caller needs a lifetime for that + * that the command was dispatched to. It's needed if the caller needs a lifetime for that * connection that extends beyond the lifetime, or subsequent calls, against the top level * client. * @@ -267,25 +228,25 @@ public: */ virtual DBClientBase* runFireAndForgetCommand(OpMsgRequest request); - /** Run a database command. Database commands are represented as BSON objects. Common database - commands have prebuilt helper functions -- see below. If a helper is not available you can - directly call runCommand. - - @param dbname database name. Use "admin" for global administrative commands. - @param cmd the command object to execute. For example, { hello : 1 } - @param info the result object the database returns. Typically has { ok : ..., errmsg : ... } - fields set. - @param options see enum QueryOptions - normally not needed to run a command - @param auth if set, the BSONObj representation will be appended to the command object sent - - @return true if the command returned "ok". - */ + /** + * Runs a database command. Database commands are represented as BSON objects. Common database + * commands have prebuilt helper functions -- see below. If a helper is not available you can + * directly call runCommand. + * + * 'dbname': Database name. Use "admin" for global administrative commands. + * 'cmd': The command object to execute. For example, { hello : 1 }. + * 'info': The result object the database returns. Typically has { ok : ..., errmsg : ... } + * fields set. + * 'options': See enum QueryOptions - normally not needed to run a command. + * + * Returns true if the command returned "ok". + */ bool runCommand(const std::string& dbname, BSONObj cmd, BSONObj& info, int options = 0); /* - * This wraps up the runCommand function avove, but returns the DBClient that actually ran - * the command. When called against a replica set, this will return the specific - * replica set member the command ran against. + * Wraps up the runCommand function avove, but returns the DBClient that actually ran the + * command. When called against a replica set, this will return the specific replica set member + * the command ran against. * * This is used in the shell so that cursors can send getMore through the correct connection. */ @@ -306,46 +267,47 @@ public: /** * Authenticates to another cluster member using appropriate authentication data. - * @return true if the authentication was successful + * Returns true if the authentication was successful. */ virtual Status authenticateInternalUser( auth::StepDownBehavior stepDownBehavior = auth::StepDownBehavior::kKillConnection); /** - * Authenticate a user. + * Authenticates a user. * - * The "params" BSONObj should be initialized with some of the fields below. Which fields + * The 'params' BSONObj should be initialized with some of the fields below. Which fields * are required depends on the mechanism, which is mandatory. * - * "mechanism": The std::string name of the sasl mechanism to use. Mandatory. - * "user": The std::string name of the user to authenticate. Mandatory. - * "db": The database target of the auth command, which identifies the location - * of the credential information for the user. May be "$external" if - * credential information is stored outside of the mongo cluster. Mandatory. - * "pwd": The password data. - * "digestPassword": Boolean, set to true if the "pwd" is undigested (default). - * "serviceName": The GSSAPI service name to use. Defaults to "mongodb". - * "serviceHostname": The GSSAPI hostname to use. Defaults to the name of the remote - * host. - * - * Other fields in "params" are silently ignored. - * - * Returns normally on success, and throws on error. Throws a DBException with getCode() == - * ErrorCodes::AuthenticationFailed if authentication is rejected. All other exceptions are + * 'mechanism': The std::string name of the sasl mechanism to use. Mandatory. + * 'user': The std::string name of the user to authenticate. Mandatory. + * 'db': The database target of the auth command, which identifies the location + * of the credential information for the user. May be "$external" if + * credential information is stored outside of the mongo cluster. Mandatory. + * 'pwd': The password data. + * 'digestPassword': Boolean, set to true if the "pwd" is undigested (default). + * 'serviceName': The GSSAPI service name to use. Defaults to "mongodb". + * 'serviceHostname': The GSSAPI hostname to use. Defaults to the name of the remote host. + * + * Other fields in 'params' are silently ignored. + * + * Returns normally on success, and throws on error. Throws a DBException with getCode() == + * ErrorCodes::AuthenticationFailed if authentication is rejected. All other exceptions are * tantamount to authentication failure, but may also indicate more serious problems. */ void auth(const BSONObj& params); - /** Authorize access to a particular database. - Authentication is separate for each database on the server -- you may authenticate for any - number of databases on a single connection. - The "admin" database is special and once authenticated provides access to all databases on - the server. - @param digestPassword if password is plain text, set this to true. otherwise assumed - to be pre-digested - @param[out] authLevel level of authentication for the given user - @return true if successful - */ + /** + * Authorizes access to a particular database. + * + * Authentication is separate for each database on the server -- you may authenticate for any + * number of databases on a single connection. The "admin" database is special and once + * authenticated provides access to all databases on the server. + * + * 'digestPassword': If password is plain text, set this to true. otherwise assumed to be + * pre-digested. + * + * Returns true if successful. + */ bool auth(const std::string& dbname, const std::string& username, const std::string& pwd, @@ -355,50 +317,32 @@ public: /** * Logs out the connection for the given database. * - * @param dbname the database to logout from. - * @param info the result object for the logout command (provided for backwards - * compatibility with mongo shell) + * 'dbname': The database to logout from. + * 'info': The result object for the logout command (provided for backwards compatibility with + * mongo shell). */ virtual void logout(const std::string& dbname, BSONObj& info); - /** count number of objects in collection ns that match the query criteria specified - throws UserAssertion if database returns an error - */ - virtual long long count(NamespaceStringOrUUID nsOrUuid, - const BSONObj& query = BSONObj(), - int options = 0, - int limit = 0, - int skip = 0, - boost::optional<BSONObj> readConcernObj = boost::none); - - static std::string createPasswordDigest(const std::string& username, - const std::string& clearTextPassword); - - /** returns true in isPrimary param if this db is the current primary of a replica pair. - - pass in info for more details e.g.: - { "isprimary" : 1.0 , "msg" : "not paired" , "ok" : 1.0 } - - returns true if command invoked successfully. - */ - virtual bool isPrimary(bool& isPrimary, BSONObj* info = nullptr); + virtual bool authenticatedDuringConnect() const { + return false; + } /** - Create a new collection in the database. Normally, collection creation is automatic. You - would use this function if you wish to specify special options on creation. - - If the collection already exists, no action occurs. - - @param ns fully qualified collection name - @param size desired initial extent size for the collection. - Must be <= 1000000000 for normal collections. - For fixed size (capped) collections, this size is the total/max size of the - collection. - @param capped if true, this is a fixed size collection (where old data rolls out). - @param max maximum number of objects if capped (optional). - - returns true if successful. - */ + * Creates a new collection in the database. Normally, collection creation is automatic. You + * would use this function if you wish to specify special options on creation. + * + * If the collection already exists, no action occurs. + * + * 'ns': Fully qualified collection name. + * 'size': Desired initial extent size for the collection. + * Must be <= 1000000000 for normal collections. + * For fixed size (capped) collections, this size is the total/max size of the + * collection. + * 'capped': If true, this is a fixed size collection (where old data rolls out). + * 'max': Maximum number of objects if capped (optional). + * + * Returns true if successful. + */ bool createCollection(const std::string& ns, long long size = 0, bool capped = false, @@ -406,9 +350,11 @@ public: BSONObj* info = nullptr, boost::optional<BSONObj> writeConcernObj = boost::none); - /** Delete the specified collection. - * @param info An optional output parameter that receives the result object the database - * returns from the drop command. May be null if the caller doesn't need that info. + /** + * Deletes the specified collection. + * + * 'info': An optional output parameter that receives the result object the database returns + * from the drop command. May be null if the caller doesn't need that info. */ virtual bool dropCollection(const std::string& ns, const WriteConcernOptions& writeConcern = WriteConcernOptions(), @@ -427,8 +373,9 @@ public: return res; } - /** validate a collection, checking for errors and reporting back statistics. - this operation is slow and blocking. + /** + * Validates a collection, checking for errors and reporting back statistics. + * This operation is slow and blocking. */ bool validate(const std::string& ns) { BSONObj cmd = BSON("validate" << nsGetCollection(ns)); @@ -444,10 +391,24 @@ public: std::list<BSONObj> getCollectionInfos(const std::string& db, const BSONObj& filter = BSONObj()); /** + * Drops an entire database. + */ + virtual bool dropDatabase(const std::string& dbname, + const WriteConcernOptions& writeConcern = WriteConcernOptions(), + BSONObj* info = nullptr) { + BSONObj o; + if (info == nullptr) + info = &o; + return runCommand( + dbname, BSON("dropDatabase" << 1 << "writeConcern" << writeConcern.toBSON()), *info); + } + + /** * Lists databases available on the server. - * @param filter A filter for the results - * @param nameOnly Only return the names of the databases - * @param authorizedDatabases Only return the databases the user is authorized on + * + * 'filter': A filter for the results + * 'nameOnly': Only return the names of the databases + * 'authorizedDatabases': Only return the databases the user is authorized on */ std::vector<BSONObj> getDatabaseInfos(const BSONObj& filter = BSONObj(), bool nameOnly = false, @@ -455,14 +416,14 @@ public: bool exists(const std::string& ns); - /** Create an index on the collection 'ns' as described by the given keys. If you wish - * to specify options, see the more flexible overload of 'createIndex' which takes an - * IndexSpec object. Failure to construct the index is reported by throwing a - * AssertionException. + /** + * Creates an index on the collection 'ns' as described by the given keys. If you wish to + * specify options, see the more flexible overload of 'createIndex' which takes an IndexSpec + * object. Failure to construct the index is reported by throwing a AssertionException. * - * @param ns Namespace on which to create the index - * @param keys Document describing keys and index types. You must provide at least one - * field and its direction. + * 'ns': Namespace on which to create the index + * 'keys': Document describing keys and index types. You must provide at least one field and + * its direction. */ void createIndex(StringData ns, const BSONObj& keys, @@ -470,13 +431,13 @@ public: return createIndex(ns, IndexSpec().addKeys(keys), writeConcernObj); } - /** Create an index on the collection 'ns' as described by the given - * descriptor. Failure to construct the index is reported by throwing a - * AssertionException. + /** + * Creates an index on the collection 'ns' as described by the given descriptor. Failure to + * construct the index is reported by throwing a AssertionException. * - * @param ns Namespace on which to create the index - * @param descriptor Configuration object describing the index to create. The - * descriptor must describe at least one key and index type. + * 'ns': Namespace on which to create the index + * 'descriptor': Configuration object describing the index to create. The descriptor must + * describe at least one key and index type. */ virtual void createIndex(StringData ns, const IndexSpec& descriptor, @@ -525,7 +486,7 @@ public: boost::optional<BSONObj> writeConcernObj = boost::none); /** - drops all indexes for the collection + * Drops all indexes for the collection. */ virtual void dropIndexes(const std::string& ns, boost::optional<BSONObj> writeConcernObj = boost::none); @@ -534,72 +495,66 @@ public: static std::string genIndexName(const BSONObj& keys); - /** Erase / drop an entire database */ - virtual bool dropDatabase(const std::string& dbname, - const WriteConcernOptions& writeConcern = WriteConcernOptions(), - BSONObj* info = nullptr) { - BSONObj o; - if (info == nullptr) - info = &o; - return runCommand( - dbname, BSON("dropDatabase" << 1 << "writeConcern" << writeConcern.toBSON()), *info); - } - - virtual std::string toString() const = 0; - /** - * Run a pseudo-command such as sys.inprog/currentOp, sys.killop/killOp - * or sys.unlock/fsyncUnlock - * - * The real command will be tried first, and if the remote server does not - * implement the command, it will fall back to the pseudoCommand. - * - * The cmdArgs parameter should NOT include {<commandName>: 1}. - * - * TODO: remove after MongoDB 3.2 is released and replace all callers with - * a call to plain runCommand + * 'actualServer' is set to the actual server where they call went if there was a choice (for + * example SecondaryOk). */ - virtual bool runPseudoCommand(StringData db, - StringData realCommandName, - StringData pseudoCommandCol, - const BSONObj& cmdArgs, - BSONObj& info, - int options = 0); + virtual bool call(Message& toSend, + Message& response, + bool assertOk = true, + std::string* actualServer = nullptr) = 0; + + virtual void say(Message& toSend, + bool isRetry = false, + std::string* actualServer = nullptr) = 0; /** - * Reconnect if needed and allowed. + * Used by QueryOption_Exhaust. To use that your subclass must implement this. */ - virtual void checkConnection() {} - - static const uint64_t INVALID_SOCK_CREATION_TIME; - - long long getConnectionId() const { - return _connectionId; + virtual Status recv(Message& m, int lastRequestId) { + verify(false); + return {ErrorCodes::NotImplemented, "recv() not implemented"}; } - virtual int getMinWireVersion() = 0; - virtual int getMaxWireVersion() = 0; + /** + * Returns a single object that matches the query. if none do, then the object is empty. + * Throws AssertionException. + */ + virtual BSONObj findOne(const std::string& ns, + const Query& query, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = 0, + boost::optional<BSONObj> readConcernObj = boost::none); - const std::vector<std::string>& getIsPrimarySaslMechanisms() const { - return _saslMechsForAuth; - } + /** + * Returns a pair with a single object that matches the filter within the collection specified + * by the UUID and the namespace of that collection on the queried node. + * + * If the command fails, an assertion error is thrown. Otherwise, if no document matches + * the query, an empty BSONObj is returned. + * Throws AssertionException. + */ + virtual std::pair<BSONObj, NamespaceString> findOneByUUID( + const std::string& db, + UUID uuid, + const BSONObj& filter, + boost::optional<BSONObj> readConcernObj = boost::none); - /** send a query to the database. - @param ns namespace to query, format is <dbname>.<collectname>[.<collectname>]* - @param query query to perform on the collection. this is a BSONObj (binary JSON) - You may format as - { query: { ... }, orderby: { ... } } - to specify a sort order. - @param limit - the maximum number of documents that the cursor should return. 0 = unlimited. - @param nToSkip start with the nth item - @param fieldsToReturn optional template of which fields to select. if unspecified, - returns all fields - @param queryOptions see options enum at top of this file - - @return cursor. 0 if error (connection failure) - @throws AssertionException - */ - std::unique_ptr<DBClientCursor> query( + /** + * Sends a query to the database. + * + * 'ns': Namespace to query, format is <dbname>.<collectname>[.<collectname>]* + * 'query': Query to perform on the collection. + * 'limit': The maximum number of documents that the cursor should return. 0 = unlimited. + * 'nToSkip': Start with the nth item. + * 'fieldsToReturn': Optional template of which fields to select. If unspecified, returns all + * fields. + * 'queryOptions': See options enum at top of this file. + * + * Returns nullptr if error (connection failure). + * Throws AssertionException. + */ + virtual std::unique_ptr<DBClientCursor> query( const NamespaceStringOrUUID& nsOrUuid, Query query, int limit = 0, @@ -607,22 +562,22 @@ public: const BSONObj* fieldsToReturn = nullptr, int queryOptions = 0, int batchSize = 0, - boost::optional<BSONObj> readConcernObj = boost::none) override; - - - /** Uses QueryOption_Exhaust, when available and specified in 'queryOptions'. - - Exhaust mode sends back all data queries as fast as possible, with no back-and-forth for - OP_GET_MORE. If you are certain you will exhaust the query, it could be useful. If - exhaust mode is not specified in 'queryOptions' or not available, this call transparently - falls back to using ordinary getMores. - - Use the DBClientCursorBatchIterator version, below, if you want to do items in large - blocks, perhaps to avoid granular locking and such. + boost::optional<BSONObj> readConcernObj = boost::none); - Note: - The version that takes a BSONObj cannot return the namespace queried when the query is - is done by UUID. If this is required, use the DBClientBatchIterator version. + /** + * Uses QueryOption_Exhaust, when available and specified in 'queryOptions'. + * + * Exhaust mode sends back all data queries as fast as possible, with no back-and-forth for + * getMore. If you are certain you will exhaust the query, it could be useful. If exhaust mode + * is not specified in 'queryOptions' or not available, this call transparently falls back to + * using ordinary getMores. + * + * Use the DBClientCursorBatchIterator version, below, if you want to do items in large + * blocks, perhaps to avoid granular locking and such. + * + * Note: + * The version that takes a BSONObj cannot return the namespace queried when the query is done + * by UUID. If this is required, use the DBClientBatchIterator version. */ unsigned long long query(std::function<void(const BSONObj&)> f, const NamespaceStringOrUUID& nsOrUuid, @@ -630,32 +585,41 @@ public: const BSONObj* fieldsToReturn = nullptr, int queryOptions = QueryOption_Exhaust, int batchSize = 0, - boost::optional<BSONObj> readConcernObj = boost::none) final; + boost::optional<BSONObj> readConcernObj = boost::none); - unsigned long long query(std::function<void(DBClientCursorBatchIterator&)> f, - const NamespaceStringOrUUID& nsOrUuid, - Query query, - const BSONObj* fieldsToReturn = nullptr, - int queryOptions = QueryOption_Exhaust, - int batchSize = 0, - boost::optional<BSONObj> readConcernObj = boost::none) override; + virtual unsigned long long query(std::function<void(DBClientCursorBatchIterator&)> f, + const NamespaceStringOrUUID& nsOrUuid, + Query query, + const BSONObj* fieldsToReturn = nullptr, + int queryOptions = QueryOption_Exhaust, + int batchSize = 0, + boost::optional<BSONObj> readConcernObj = boost::none); + /** + * Don't use this - called automatically by DBClientCursor for you. + * 'cursorId': Id of cursor to retrieve. + * Returns an handle to a previously allocated cursor. + * Throws AssertionException. + */ + virtual std::unique_ptr<DBClientCursor> getMore(const std::string& ns, long long cursorId); - /** don't use this - called automatically by DBClientCursor for you - @param cursorId id of cursor to retrieve - @return an handle to a previously allocated cursor - @throws AssertionException + /** + * Counts number of objects in collection ns that match the query criteria specified. + * Throws UserAssertion if database returns an error. */ - virtual std::unique_ptr<DBClientCursor> getMore(const std::string& ns, - long long cursorId, - int options = 0); + virtual long long count(NamespaceStringOrUUID nsOrUuid, + const BSONObj& query = BSONObj(), + int options = 0, + int limit = 0, + int skip = 0, + boost::optional<BSONObj> readConcernObj = boost::none); /** * Executes an acknowledged command to insert a vector of documents. */ virtual BSONObj insertAcknowledged(const std::string& ns, const std::vector<BSONObj>& v, - int flags = 0, + bool ordered = true, boost::optional<BSONObj> writeConcernObj = boost::none); /** @@ -663,7 +627,7 @@ public: */ virtual void insert(const std::string& ns, BSONObj obj, - int flags = 0, + bool ordered = true, boost::optional<BSONObj> writeConcernObj = boost::none); /** @@ -671,7 +635,7 @@ public: */ virtual void insert(const std::string& ns, const std::vector<BSONObj>& v, - int flags = 0, + bool ordered = true, boost::optional<BSONObj> writeConcernObj = boost::none); /** @@ -694,18 +658,12 @@ public: bool multi = false, boost::optional<BSONObj> writeConcernObj = boost::none); - virtual void update(const std::string& ns, - Query query, - BSONObj obj, - int flags, - boost::optional<BSONObj> writeConcernObj = boost::none); - /** * Executes an acknowledged command to remove the objects that match the query. */ virtual BSONObj removeAcknowledged(const std::string& ns, Query query, - int flags = 0, + bool removeMany = true, boost::optional<BSONObj> writeConcernObj = boost::none); /** @@ -713,56 +671,17 @@ public: */ virtual void remove(const std::string& ns, Query query, - int flags = 0, + bool removeMany = true, boost::optional<BSONObj> writeConcernObj = boost::none); - virtual bool isFailed() const = 0; - - /** - * if not checked recently, checks whether the underlying socket/sockets are still valid - */ - virtual bool isStillConnected() = 0; - virtual void killCursor(const NamespaceString& ns, long long cursorID); - virtual ConnectionString::ConnectionType type() const = 0; - - virtual double getSoTimeout() const = 0; - - virtual uint64_t getSockCreationMicroSec() const { - return INVALID_SOCK_CREATION_TIME; - } - - virtual void reset() {} - - virtual bool isReplicaSetMember() const = 0; - - virtual bool isMongos() const = 0; - - virtual bool authenticatedDuringConnect() const { - return false; - } - - /** - * Parses command replies and runs them through the metadata reader. - * This is virtual and non-const to allow subclasses to act on failures. - */ - virtual rpc::UniqueReply parseCommandReplyMessage(const std::string& host, - const Message& replyMsg); - - /** - * Returns the latest operationTime tracked on this client. - */ - Timestamp getOperationTime(); - - void setOperationTime(Timestamp operationTime); - // This is only for DBClientCursor. static void (*withConnection_do_not_use)(std::string host, std::function<void(DBClientBase*)>); #ifdef MONGO_CONFIG_SSL /** - * Get the SSL configuration of this client. + * Gets the SSL configuration of this client. */ virtual const SSLConfiguration* getSSLConfiguration() = 0; @@ -786,10 +705,14 @@ public: } protected: - /** if the result of a command is ok*/ + /** + * Returns true if the result of a command is ok. + */ bool isOk(const BSONObj&); - /** if the element contains a not primary error */ + /** + * Returns true if the element contains a not primary error. + */ bool isNotPrimaryErrorString(const BSONElement& e); BSONObj _countCmd(NamespaceStringOrUUID nsOrUuid, @@ -800,7 +723,7 @@ protected: boost::optional<BSONObj> readConcernObj); /** - * Look up the options available on this client. Caches the answer from + * Looks up the options available on this client. Caches the answer from * _lookupAvailableOptions(), below. */ QueryOptions availableOptions(); @@ -809,10 +732,14 @@ protected: virtual void _auth(const BSONObj& params); - // should be set by subclasses during connection. + /** + * Should be set by subclasses during connection. + */ void _setServerRPCProtocols(rpc::ProtocolSet serverProtocols); - /** controls how chatty the client is about network errors & such. See log.h */ + /** + * Controls how chatty the client is about network errors & such. See log.h. + */ const logv2::LogSeverity _logLevel; static AtomicWord<long long> ConnectionIdSequence; @@ -830,9 +757,7 @@ private: auth::RunCommandHook _makeAuthRunCommandHook(); - /** - * The rpc protocol the remote server(s) support. - */ + // The rpc protocol the remote server(s) support. rpc::ProtocolSet _serverRPCProtocols{rpc::supports::kOpMsgOnly}; rpc::RequestMetadataWriter _metadataWriter; diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp index 6e643b75ad0..b50487dcbfa 100644 --- a/src/mongo/client/dbclient_connection.cpp +++ b/src/mongo/client/dbclient_connection.cpp @@ -47,7 +47,6 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/bson/util/builder.h" #include "mongo/client/authenticate.h" -#include "mongo/client/constants.h" #include "mongo/client/dbclient_cursor.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/client/sasl_client_authenticate.h" @@ -787,22 +786,6 @@ bool DBClientConnection::call(Message& toSend, return true; } -void DBClientConnection::checkResponse(const std::vector<BSONObj>& batch, - bool networkError, - bool* retry, - string* host) { - /* check for errors. the only one we really care about at - * this stage is "not master" - */ - - *retry = false; - *host = _serverAddress.toString(); - - if (!_parentReplSetName.empty() && !batch.empty()) { - handleNotPrimaryResponse(batch[0], "$err"); - } -} - void DBClientConnection::setParentReplSetName(const string& replSetName) { _parentReplSetName = replSetName; } diff --git a/src/mongo/client/dbclient_connection.h b/src/mongo/client/dbclient_connection.h index cbc978887fe..b6d6aef8169 100644 --- a/src/mongo/client/dbclient_connection.h +++ b/src/mongo/client/dbclient_connection.h @@ -241,10 +241,7 @@ public: void say(Message& toSend, bool isRetry = false, std::string* actualServer = nullptr) override; Status recv(Message& m, int lastRequestId) override; - void checkResponse(const std::vector<BSONObj>& batch, - bool networkError, - bool* retry = nullptr, - std::string* host = nullptr) override; + bool call(Message& toSend, Message& response, bool assertOk, @@ -261,10 +258,6 @@ public: _hook = hook; } - bool lazySupported() const override { - return true; - } - static int getNumConnections() { return _numConnections.load(); } diff --git a/src/mongo/client/dbclient_connection_integration_test.cpp b/src/mongo/client/dbclient_connection_integration_test.cpp index a8a84643a0d..34b13adda99 100644 --- a/src/mongo/client/dbclient_connection_integration_test.cpp +++ b/src/mongo/client/dbclient_connection_integration_test.cpp @@ -58,7 +58,7 @@ public: auto conn = makeConn(kAppName + "-cleanup"); BSONObj currOp; - if (!conn->simpleCommand("admin", &currOp, "currentOp")) + if (!conn->runCommand("admin", BSON("currentOp" << 1), currOp)) uassertStatusOK(getStatusFromCommandResult(currOp)); for (auto&& op : currOp["inprog"].Obj()) { diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp index d83d444af8e..897d84ebe1c 100644 --- a/src/mongo/client/dbclient_cursor.cpp +++ b/src/mongo/client/dbclient_cursor.cpp @@ -81,8 +81,10 @@ Message assembleCommandRequest(DBClientBase* cli, request.body = bodyBob.obj(); } - return rpc::messageFromOpMsgRequest( - cli->getClientRPCProtocols(), cli->getServerRPCProtocols(), std::move(request)); + rpc::Protocol protocol = + uassertStatusOK(rpc::negotiate(cli->getClientRPCProtocols(), cli->getServerRPCProtocols())); + invariant(protocol == rpc::Protocol::kOpMsg); + return request.serialize(); } } // namespace @@ -188,42 +190,6 @@ bool DBClientCursor::init() { return true; } -void DBClientCursor::initLazy(bool isRetry) { - massert(15875, - "DBClientCursor::initLazy called on a client that doesn't support lazy", - _client->lazySupported()); - Message toSend = _assembleInit(); - _client->say(toSend, isRetry, &_originalHost); - _lastRequestId = toSend.header().getId(); - _connectionHasPendingReplies = true; -} - -bool DBClientCursor::initLazyFinish(bool& retry) { - invariant(_connectionHasPendingReplies); - Message reply; - Status recvStatus = _client->recv(reply, _lastRequestId); - _connectionHasPendingReplies = false; - - // If we get a bad response, return false - if (!recvStatus.isOK() || reply.empty()) { - if (!recvStatus.isOK()) - LOGV2(20129, - "DBClientCursor::init lazy say() failed: {error}", - "DBClientCursor::init lazy say() failed", - "error"_attr = redact(recvStatus)); - if (reply.empty()) - LOGV2(20130, "DBClientCursor::init message from say() was empty"); - - _client->checkResponse({}, true, &retry, &_lazyHost); - - return false; - } - - dataReceived(reply, retry, _lazyHost); - - return !retry; -} - void DBClientCursor::requestMore() { // For exhaust queries, once the stream has been initiated we get data blasted to us // from the remote server, without a need to send any more 'getMore' requests. @@ -395,21 +361,16 @@ void DBClientCursor::attach(AScopedConnection* conn) { verify(conn->get()); if (conn->get()->type() == ConnectionString::ConnectionType::kReplicaSet) { - if (_lazyHost.size() > 0) - _scopedHost = _lazyHost; - else if (_client) + if (_client) _scopedHost = _client->getServerAddress(); else - massert(14821, - "No client or lazy client specified, cannot store multi-host connection.", - false); + massert(14821, "No client specified, cannot store multi-host connection.", false); } else { _scopedHost = conn->getHost(); } conn->done(); _client = nullptr; - _lazyHost = ""; } DBClientCursor::DBClientCursor(DBClientBase* client, @@ -477,7 +438,6 @@ DBClientCursor::DBClientCursor(DBClientBase* client, fieldsToReturn(fieldsToReturn), opts(queryOptions), batchSize(batchSize == 1 ? 2 : batchSize), - resultFlags(0), cursorId(cursorId), _ownCursor(true), wasError(false), diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h index 831798cc7ce..0496905152c 100644 --- a/src/mongo/client/dbclient_cursor.h +++ b/src/mongo/client/dbclient_cursor.h @@ -130,14 +130,6 @@ public: return tailable() && (opts & QueryOption_AwaitData); } - /** see ResultFlagType (constants.h) for flag values - mostly these flags are for internal purposes - - ResultFlag_ErrSet is the possible exception to that - */ - bool hasResultFlag(int flag) { - return (resultFlags & flag) != 0; - } - /// Change batchSize after construction. Can change after requesting first batch. void setBatchSize(int newBatchSize) { batchSize = newBatchSize; @@ -198,9 +190,6 @@ public: */ bool init(); - void initLazy(bool isRetry = false); - bool initLazyFinish(bool& retry); - /** * For exhaust. Used in DBClientConnection. */ @@ -224,8 +213,7 @@ public: * If true, you should not try to use the connection for any other purpose or return it to a * pool. * - * This can happen if either initLazy() was called without initLazyFinish() or an exhaust query - * was started but not completed. + * This can happen if an exhaust query was started but not completed. */ bool connectionHasPendingReplies() const { return _connectionHasPendingReplies; @@ -304,11 +292,9 @@ private: int opts; int batchSize; std::stack<BSONObj> _putBack; - int resultFlags; long long cursorId; bool _ownCursor; // see decouple() std::string _scopedHost; - std::string _lazyHost; bool wasError; bool _connectionHasPendingReplies = false; int _lastRequestId = 0; diff --git a/src/mongo/client/dbclient_cursor_test.cpp b/src/mongo/client/dbclient_cursor_test.cpp index d267a11257a..39a62eda882 100644 --- a/src/mongo/client/dbclient_cursor_test.cpp +++ b/src/mongo/client/dbclient_cursor_test.cpp @@ -153,7 +153,7 @@ TEST_F(DBClientCursorTest, DBClientCursorCallsMetaDataReaderOncePerBatch) { // Set up the DBClientCursor and a mock client connection. DBClientConnectionForTest conn; const NamespaceString nss("test", "coll"); - DBClientCursor cursor(&conn, NamespaceStringOrUUID(nss), Query().obj, 0, 0, nullptr, 0, 0); + DBClientCursor cursor(&conn, NamespaceStringOrUUID(nss), fromjson("{}"), 0, 0, nullptr, 0, 0); cursor.setBatchSize(2); // Set up mock 'find' response. @@ -200,7 +200,7 @@ TEST_F(DBClientCursorTest, DBClientCursorHandlesOpMsgExhaustCorrectly) { DBClientConnectionForTest conn; const NamespaceString nss("test", "coll"); DBClientCursor cursor( - &conn, NamespaceStringOrUUID(nss), Query().obj, 0, 0, nullptr, QueryOption_Exhaust, 0); + &conn, NamespaceStringOrUUID(nss), fromjson("{}"), 0, 0, nullptr, QueryOption_Exhaust, 0); cursor.setBatchSize(0); // Set up mock 'find' response. @@ -264,7 +264,7 @@ TEST_F(DBClientCursorTest, DBClientCursorResendsGetMoreIfMoreToComeFlagIsOmitted DBClientConnectionForTest conn; const NamespaceString nss("test", "coll"); DBClientCursor cursor( - &conn, NamespaceStringOrUUID(nss), Query().obj, 0, 0, nullptr, QueryOption_Exhaust, 0); + &conn, NamespaceStringOrUUID(nss), fromjson("{}"), 0, 0, nullptr, QueryOption_Exhaust, 0); cursor.setBatchSize(0); // Set up mock 'find' response. @@ -349,7 +349,7 @@ TEST_F(DBClientCursorTest, DBClientCursorMoreThrowsExceptionOnNonOKResponse) { DBClientConnectionForTest conn; const NamespaceString nss("test", "coll"); DBClientCursor cursor( - &conn, NamespaceStringOrUUID(nss), Query().obj, 0, 0, nullptr, QueryOption_Exhaust, 0); + &conn, NamespaceStringOrUUID(nss), fromjson("{}"), 0, 0, nullptr, QueryOption_Exhaust, 0); cursor.setBatchSize(0); // Set up mock 'find' response. @@ -381,7 +381,7 @@ TEST_F(DBClientCursorTest, DBClientCursorMoreThrowsExceptionWhenMoreToComeFlagSe DBClientConnectionForTest conn; const NamespaceString nss("test", "coll"); DBClientCursor cursor( - &conn, NamespaceStringOrUUID(nss), Query().obj, 0, 0, nullptr, QueryOption_Exhaust, 0); + &conn, NamespaceStringOrUUID(nss), fromjson("{}"), 0, 0, nullptr, QueryOption_Exhaust, 0); cursor.setBatchSize(0); // Set up mock 'find' response. @@ -490,7 +490,7 @@ TEST_F(DBClientCursorTest, DBClientCursorTailable) { const NamespaceString nss("test", "coll"); DBClientCursor cursor(&conn, NamespaceStringOrUUID(nss), - Query().obj, + fromjson("{}"), 0, 0, nullptr, @@ -589,7 +589,7 @@ TEST_F(DBClientCursorTest, DBClientCursorTailableAwaitData) { const NamespaceString nss("test", "coll"); DBClientCursor cursor(&conn, NamespaceStringOrUUID(nss), - Query().obj, + fromjson("{}"), 0, 0, nullptr, @@ -655,7 +655,7 @@ TEST_F(DBClientCursorTest, DBClientCursorTailableAwaitDataExhaust) { const NamespaceString nss("test", "coll"); DBClientCursor cursor(&conn, NamespaceStringOrUUID(nss), - Query().obj, + fromjson("{}"), 0, 0, nullptr, diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index 8659ac795f8..82457ebd7ac 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -532,36 +532,28 @@ void DBClientReplicaSet::logout(const string& dbname, BSONObj& info) { void DBClientReplicaSet::insert(const string& ns, BSONObj obj, - int flags, + bool ordered, boost::optional<BSONObj> writeConcernObj) { - checkPrimary()->insert(ns, obj, flags, writeConcernObj); + checkPrimary()->insert(ns, obj, ordered, writeConcernObj); } void DBClientReplicaSet::insert(const string& ns, const vector<BSONObj>& v, - int flags, + bool ordered, boost::optional<BSONObj> writeConcernObj) { - checkPrimary()->insert(ns, v, flags, writeConcernObj); + checkPrimary()->insert(ns, v, ordered, writeConcernObj); } void DBClientReplicaSet::remove(const string& ns, Query obj, - int flags, + bool removeMany, boost::optional<BSONObj> writeConcernObj) { - checkPrimary()->remove(ns, obj, flags, writeConcernObj); -} - -void DBClientReplicaSet::update(const string& ns, - Query query, - BSONObj obj, - int flags, - boost::optional<BSONObj> writeConcernObj) { - return checkPrimary()->update(ns, query, obj, flags, writeConcernObj); + checkPrimary()->remove(ns, obj, removeMany, writeConcernObj); } unique_ptr<DBClientCursor> DBClientReplicaSet::query(const NamespaceStringOrUUID& nsOrUuid, Query query, - int nToReturn, + int limit, int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, @@ -597,7 +589,7 @@ unique_ptr<DBClientCursor> DBClientReplicaSet::query(const NamespaceStringOrUUID unique_ptr<DBClientCursor> cursor = conn->query(nsOrUuid, query, - nToReturn, + limit, nToSkip, fieldsToReturn, queryOptions, @@ -628,14 +620,8 @@ unique_ptr<DBClientCursor> DBClientReplicaSet::query(const NamespaceStringOrUUID "dbclient_rs query to primary node", "replicaSet"_attr = _getMonitor()->getName()); - return checkPrimary()->query(nsOrUuid, - query, - nToReturn, - nToSkip, - fieldsToReturn, - queryOptions, - batchSize, - readConcernObj); + return checkPrimary()->query( + nsOrUuid, query, limit, nToSkip, fieldsToReturn, queryOptions, batchSize, readConcernObj); } BSONObj DBClientReplicaSet::findOne(const string& ns, @@ -838,7 +824,7 @@ DBClientConnection* DBClientReplicaSet::selectNodeUsingTags( void DBClientReplicaSet::say(Message& toSend, bool isRetry, string* actualServer) { if (!isRetry) - _lazyState = LazyState(); + _lastClient = nullptr; const int lastOp = toSend.operation(); @@ -866,7 +852,6 @@ void DBClientReplicaSet::say(Message& toSend, bool isRetry, string* actualServer string lastNodeErrMsg; for (size_t retry = 0; retry < MAX_RETRY; retry++) { - _lazyState._retries = retry; try { DBClientConnection* conn = selectNodeUsingTags(readPref); @@ -880,9 +865,7 @@ void DBClientReplicaSet::say(Message& toSend, bool isRetry, string* actualServer conn->say(toSend); - _lazyState._lastOp = lastOp; - _lazyState._secondaryQueryOk = true; - _lazyState._lastClient = conn; + _lastClient = conn; } catch (const DBException& ex) { const Status status = ex.toStatus(str::stream() << "can't callLazy replica set node " @@ -916,107 +899,27 @@ void DBClientReplicaSet::say(Message& toSend, bool isRetry, string* actualServer if (actualServer) *actualServer = primary->getServerAddress(); - _lazyState._lastOp = lastOp; - _lazyState._secondaryQueryOk = false; - // Don't retry requests to primary since there is only one host to try - _lazyState._retries = MAX_RETRY; - _lazyState._lastClient = primary; + _lastClient = primary; primary->say(toSend); return; } Status DBClientReplicaSet::recv(Message& m, int lastRequestId) { - verify(_lazyState._lastClient); + verify(_lastClient); try { - return _lazyState._lastClient->recv(m, lastRequestId); + return _lastClient->recv(m, lastRequestId); } catch (DBException& e) { LOGV2(20143, "Could not receive data from {connString}: {error}", "Could not receive data", - "connString"_attr = _lazyState._lastClient->toString(), + "connString"_attr = _lastClient->toString(), "error"_attr = redact(e)); return e.toStatus(); } } -void DBClientReplicaSet::checkResponse(const std::vector<BSONObj>& batch, - bool networkError, - bool* retry, - string* targetHost) { - // For now, do exactly as we did before, so as not to break things. In general though, we - // should fix this so checkResponse has a more consistent contract. - if (!retry) { - if (_lazyState._lastClient) - return _lazyState._lastClient->checkResponse(batch, networkError); - else - return checkPrimary()->checkResponse(batch, networkError); - } - - *retry = false; - if (targetHost && _lazyState._lastClient) - *targetHost = _lazyState._lastClient->getServerAddress(); - else if (targetHost) - *targetHost = ""; - - if (!_lazyState._lastClient) - return; - - // nReturned == 1 means that we got one result back, which might be an error - // networkError is a sentinel value for "no data returned" aka (usually) network problem - // If neither, this must be a query result so our response is ok wrt the replica set - if (batch.size() != 1 && !networkError) - return; - - BSONObj dataObj; - if (batch.size() == 1) - dataObj = batch[0]; - - // Check if we should retry here - if (_lazyState._lastOp == dbQuery && _lazyState._secondaryQueryOk) { - // query could potentially go to a secondary, so see if this is an error (or empty) and - // retry if we're not past our retry limit. - - if (networkError || - (hasErrField(dataObj) && !dataObj["code"].eoo() && - dataObj["code"].Int() == ErrorCodes::NotPrimaryOrSecondary)) { - if (_lazyState._lastClient == _lastSecondaryOkConn.get()) { - isntSecondary(); - } else if (_lazyState._lastClient == _primary.get()) { - isNotPrimary(); - } else { - LOGV2_WARNING(20151, - "Data {dataObj} is invalid because last rs client {connString} is " - "not primary or secondary", - "Data is invalid because last rs client is not primary or secondary", - "dataObj"_attr = redact(dataObj), - "connString"_attr = _lazyState._lastClient->toString()); - } - - if (_lazyState._retries < static_cast<int>(MAX_RETRY)) { - _lazyState._retries++; - *retry = true; - } else { - LOGV2(20144, - "Too many retries ({numRetries}), could not get data from replica set", - "Too many retries, could not get data from replica set", - "numRetries"_attr = _lazyState._retries); - } - } - } else if (_lazyState._lastOp == dbQuery) { - // if query could not potentially go to a secondary, just mark the primary as bad - - if (networkError || - (hasErrField(dataObj) && !dataObj["code"].eoo() && - dataObj["code"].Int() == ErrorCodes::NotPrimaryNoSecondaryOk)) { - if (_lazyState._lastClient == _primary.get()) { - isNotPrimary(); - } - } - } -} - DBClientBase* DBClientReplicaSet::runFireAndForgetCommand(OpMsgRequest request) { // Assume all fire-and-forget commands should go to the primary node. It is currently used // for writes which need to go to the primary and for killCursors which should be sent to a @@ -1180,7 +1083,7 @@ void DBClientReplicaSet::_invalidateLastSecondaryOkCache(const Status& status) { void DBClientReplicaSet::reset() { resetSecondaryOkConn(); - _lazyState._lastClient = nullptr; + _lastClient = nullptr; _lastReadPref.reset(); } diff --git a/src/mongo/client/dbclient_rs.h b/src/mongo/client/dbclient_rs.h index 38eae4bdb34..ad2dbcbe4a9 100644 --- a/src/mongo/client/dbclient_rs.h +++ b/src/mongo/client/dbclient_rs.h @@ -58,8 +58,6 @@ typedef std::shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr; class DBClientReplicaSet : public DBClientBase { public: using DBClientBase::query; - using DBClientBase::remove; - using DBClientBase::update; /** Call connect() after constructing. autoReconnect is always on for DBClientReplicaSet * connections. */ @@ -94,7 +92,7 @@ public: std::unique_ptr<DBClientCursor> query( const NamespaceStringOrUUID& nsOrUuid, Query query, - int nToReturn = 0, + int limit = 0, int nToSkip = 0, const BSONObj* fieldsToReturn = nullptr, int queryOptions = 0, @@ -110,25 +108,19 @@ public: void insert(const std::string& ns, BSONObj obj, - int flags = 0, + bool ordered = true, boost::optional<BSONObj> writeConcernObj = boost::none) override; /** insert multiple objects. Note that single object insert is asynchronous, so this version is only nominally faster and not worth a special effort to try to use. */ void insert(const std::string& ns, const std::vector<BSONObj>& v, - int flags = 0, + bool ordered = true, boost::optional<BSONObj> writeConcernObj = boost::none) override; void remove(const std::string& ns, Query obj, - int flags, - boost::optional<BSONObj> writeConcernObj = boost::none) override; - - void update(const std::string& ns, - Query query, - BSONObj obj, - int flags, + bool removeMany = true, boost::optional<BSONObj> writeConcernObj = boost::none) override; void killCursor(const NamespaceString& ns, long long cursorID) override; @@ -156,10 +148,6 @@ public: void say(Message& toSend, bool isRetry = false, std::string* actualServer = nullptr) override; Status recv(Message& toRecv, int lastRequestId) override; - void checkResponse(const std::vector<BSONObj>& batch, - bool networkError, - bool* retry = nullptr, - std::string* targetHost = nullptr) override; /* this is the callback from our underlying connections to notify us that we got a "not primary" * error. @@ -205,9 +193,6 @@ public: ConnectionString::ConnectionType type() const override { return ConnectionString::ConnectionType::kReplicaSet; } - bool lazySupported() const override { - return true; - } using DBClientBase::runCommandWithTarget; std::pair<rpc::UniqueReply, DBClientBase*> runCommandWithTarget(OpMsgRequest request) final; @@ -365,17 +350,6 @@ private: MongoURI _uri; protected: - /** - * for storing (non-threadsafe) information between lazy calls - */ - class LazyState { - public: - LazyState() : _lastClient(nullptr), _lastOp(-1), _secondaryQueryOk(false), _retries(0) {} - DBClientConnection* _lastClient; - int _lastOp; - bool _secondaryQueryOk; - int _retries; - - } _lazyState; + DBClientConnection* _lastClient = nullptr; }; } // namespace mongo |