// dbclient.cpp - connect to a Mongo database as a database, from C++ /* Copyright 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/bson/util/builder.h" #include "mongo/client/authenticate.h" #include "mongo/client/constants.h" #include "mongo/client/dbclientcursor.h" #include "mongo/client/dbclientinterface.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/config.h" #include "mongo/db/auth/internal_user_auth.h" #include "mongo/db/commands.h" #include "mongo/db/json.h" #include "mongo/db/namespace_string.h" #include "mongo/db/server_options.h" #include "mongo/db/wire_version.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/reply_interface.h" #include "mongo/rpc/request_builder_interface.h" #include "mongo/s/stale_exception.h" // for RecvStaleConfigException #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/debug_util.h" #include "mongo/util/log.h" #include "mongo/util/net/sock.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/net/ssl_options.h" #include "mongo/util/password_digest.h" namespace mongo { using std::unique_ptr; using std::endl; using std::list; using std::map; using std::string; using std::stringstream; using std::vector; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; namespace { #ifdef MONGO_CONFIG_SSL static SimpleMutex s_mtx; static SSLManagerInterface* s_sslMgr(NULL); SSLManagerInterface* sslManager() { stdx::lock_guard lk(s_mtx); if (s_sslMgr) { return s_sslMgr; } s_sslMgr = getSSLManager(); return s_sslMgr; } #endif } // namespace AtomicInt64 DBClientBase::ConnectionIdSequence; const BSONField Query::ReadPrefField("$readPreference"); const BSONField Query::ReadPrefModeField("mode"); const BSONField Query::ReadPrefTagsField("tags"); Query::Query(const string& json) : obj(fromjson(json)) {} Query::Query(const char* json) : obj(fromjson(json)) {} Query& Query::hint(const string& jsonKeyPatt) { return hint(fromjson(jsonKeyPatt)); } Query& Query::where(const string& jscode, BSONObj scope) { /* use where() before sort() and hint() and explain(), else this will assert. */ verify(!isComplex()); BSONObjBuilder b; b.appendElements(obj); b.appendWhere(jscode, scope); obj = b.obj(); return *this; } void Query::makeComplex() { if (isComplex()) return; BSONObjBuilder b; b.append("query", obj); obj = b.obj(); } Query& Query::sort(const BSONObj& s) { appendComplex("orderby", s); return *this; } Query& Query::hint(BSONObj keyPattern) { appendComplex("$hint", keyPattern); return *this; } Query& Query::explain() { appendComplex("$explain", true); return *this; } Query& Query::snapshot() { appendComplex("$snapshot", true); return *this; } Query& Query::minKey(const BSONObj& val) { appendComplex("$min", val); return *this; } Query& Query::maxKey(const BSONObj& val) { appendComplex("$max", val); return *this; } bool Query::isComplex(const BSONObj& obj, bool* hasDollar) { if (obj.hasElement("query")) { if (hasDollar) *hasDollar = false; return true; } if (obj.hasElement("$query")) { if (hasDollar) *hasDollar = true; return true; } return false; } Query& Query::readPref(ReadPreference pref, const BSONArray& tags) { appendComplex(ReadPrefField.name().c_str(), ReadPreferenceSetting(pref, TagSet(tags)).toBSON()); return *this; } bool Query::isComplex(bool* hasDollar) const { return isComplex(obj, hasDollar); } bool Query::hasReadPreference(const BSONObj& queryObj) { const bool hasReadPrefOption = queryObj["$queryOptions"].isABSONObj() && queryObj["$queryOptions"].Obj().hasField(ReadPrefField.name()); bool canHaveReadPrefField = Query::isComplex(queryObj) || // The find command has a '$readPreference' option. queryObj.firstElementFieldName() == StringData("find"); return (canHaveReadPrefField && queryObj.hasField(ReadPrefField.name())) || hasReadPrefOption; } BSONObj Query::getFilter() const { bool hasDollar; if (!isComplex(&hasDollar)) return obj; return obj.getObjectField(hasDollar ? "$query" : "query"); } BSONObj Query::getSort() const { if (!isComplex()) return BSONObj(); BSONObj ret = obj.getObjectField("orderby"); if (ret.isEmpty()) ret = obj.getObjectField("$orderby"); return ret; } BSONObj Query::getHint() const { if (!isComplex()) return BSONObj(); return obj.getObjectField("$hint"); } bool Query::isExplain() const { return isComplex() && obj.getBoolField("$explain"); } string Query::toString() const { return obj.toString(); } /* --- dbclientcommands --- */ bool DBClientWithCommands::isOk(const BSONObj& o) { return o["ok"].trueValue(); } bool DBClientWithCommands::isNotMasterErrorString(const BSONElement& e) { return e.type() == String && str::contains(e.valuestr(), "not master"); } enum QueryOptions DBClientWithCommands::availableOptions() { if (!_haveCachedAvailableOptions) { _cachedAvailableOptions = _lookupAvailableOptions(); _haveCachedAvailableOptions = true; } return _cachedAvailableOptions; } enum QueryOptions DBClientWithCommands::_lookupAvailableOptions() { BSONObj ret; if (runCommand("admin", BSON("availablequeryoptions" << 1), ret)) { return QueryOptions(ret.getIntField("options")); } return QueryOptions(0); } rpc::ProtocolSet DBClientWithCommands::getClientRPCProtocols() const { return _clientRPCProtocols; } rpc::ProtocolSet DBClientWithCommands::getServerRPCProtocols() const { return _serverRPCProtocols; } void DBClientWithCommands::setClientRPCProtocols(rpc::ProtocolSet protocols) { _clientRPCProtocols = std::move(protocols); } void DBClientWithCommands::_setServerRPCProtocols(rpc::ProtocolSet protocols) { _serverRPCProtocols = std::move(protocols); } void DBClientWithCommands::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) { _metadataWriter = std::move(writer); } const rpc::RequestMetadataWriter& DBClientWithCommands::getRequestMetadataWriter() { return _metadataWriter; } void DBClientWithCommands::setReplyMetadataReader(rpc::ReplyMetadataReader reader) { _metadataReader = std::move(reader); } const rpc::ReplyMetadataReader& DBClientWithCommands::getReplyMetadataReader() { return _metadataReader; } rpc::UniqueReply DBClientWithCommands::runCommandWithMetadata(StringData database, StringData command, const BSONObj& metadata, const BSONObj& commandArgs) { uassert(ErrorCodes::InvalidNamespace, str::stream() << "Database name '" << database << "' is not valid.", NamespaceString::validDBName(database)); // call() oddly takes this by pointer, so we need to put it on the stack. auto host = getServerAddress(); BSONObjBuilder metadataBob; metadataBob.appendElements(metadata); if (_metadataWriter) { uassertStatusOK(_metadataWriter(&metadataBob, host)); } auto requestBuilder = rpc::makeRequestBuilder(getClientRPCProtocols(), getServerRPCProtocols()); requestBuilder->setDatabase(database); requestBuilder->setCommandName(command); requestBuilder->setCommandArgs(commandArgs); requestBuilder->setMetadata(metadataBob.done()); auto requestMsg = requestBuilder->done(); Message replyMsg; // We always want to throw if there was a network error, we do it here // instead of passing 'true' for the 'assertOk' parameter so we can construct a // more helpful error message. Note that call() can itself throw a socket exception. uassert(ErrorCodes::HostUnreachable, str::stream() << "network error while attempting to run " << "command '" << command << "' " << "on host '" << host << "' ", call(requestMsg, replyMsg, false, &host)); auto commandReply = rpc::makeReply(&replyMsg); uassert(ErrorCodes::RPCProtocolNegotiationFailed, str::stream() << "Mismatched RPC protocols - request was '" << networkOpToString(requestMsg.operation()) << "' '" << " but reply was '" << networkOpToString(replyMsg.operation()) << "' ", requestBuilder->getProtocol() == commandReply->getProtocol()); if (ErrorCodes::SendStaleConfig == getStatusFromCommandResult(commandReply->getCommandReply())) { throw RecvStaleConfigException("stale config in runCommand", commandReply->getCommandReply()); } if (_metadataReader) { uassertStatusOK(_metadataReader(commandReply->getMetadata(), host)); } return rpc::UniqueReply(std::move(replyMsg), std::move(commandReply)); } bool DBClientWithCommands::runCommand(const string& dbname, const BSONObj& cmd, BSONObj& info, int options) { BSONObj upconvertedCmd; BSONObj upconvertedMetadata; // TODO: This will be downconverted immediately if the underlying // requestBuilder is a legacyRequest builder. Not sure what the best // way to get around that is without breaking the abstraction. std::tie(upconvertedCmd, upconvertedMetadata) = uassertStatusOK(rpc::upconvertRequestMetadata(cmd, options)); auto commandName = upconvertedCmd.firstElementFieldName(); auto result = runCommandWithMetadata(dbname, commandName, upconvertedMetadata, upconvertedCmd); info = result->getCommandReply().getOwned(); return isOk(info); } /* note - we build a bson obj here -- for something that is super common like getlasterror you should have that object prebuilt as that would be faster. */ bool DBClientWithCommands::simpleCommand(const string& dbname, BSONObj* info, const string& command) { BSONObj o; if (info == 0) info = &o; BSONObjBuilder b; b.append(command, 1); return runCommand(dbname, b.done(), *info); } bool DBClientWithCommands::runPseudoCommand(StringData db, StringData realCommandName, StringData pseudoCommandCol, const BSONObj& cmdArgs, BSONObj& info, int options) { BSONObjBuilder bob; bob.append(realCommandName, 1); bob.appendElements(cmdArgs); auto cmdObj = bob.done(); bool success = false; if (!(success = runCommand(db.toString(), cmdObj, info, options))) { auto status = getStatusFromCommandResult(info); verify(!status.isOK()); if (status == ErrorCodes::CommandResultSchemaViolation) { msgasserted(28624, str::stream() << "Received bad " << realCommandName << " response from server: " << info); } else if (status == ErrorCodes::CommandNotFound) { NamespaceString pseudoCommandNss(db, pseudoCommandCol); // if this throws we just let it escape as that's how runCommand works. info = findOne(pseudoCommandNss.ns(), cmdArgs, nullptr, options); return true; } } return success; } unsigned long long DBClientWithCommands::count( const string& myns, const BSONObj& query, int options, int limit, int skip) { BSONObj cmd = _countCmd(myns, query, options, limit, skip); BSONObj res; if (!runCommand(nsToDatabase(myns), cmd, res, options)) uasserted(11010, string("count fails:") + res.toString()); return res["n"].numberLong(); } BSONObj DBClientWithCommands::_countCmd( const string& myns, const BSONObj& query, int options, int limit, int skip) { NamespaceString ns(myns); BSONObjBuilder b; b.append("count", ns.coll()); b.append("query", query); if (limit) b.append("limit", limit); if (skip) b.append("skip", skip); return b.obj(); } BSONObj DBClientWithCommands::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) { return getLastErrorDetailed("admin", fsync, j, w, wtimeout); } BSONObj DBClientWithCommands::getLastErrorDetailed( const std::string& db, bool fsync, bool j, int w, int wtimeout) { BSONObj info; BSONObjBuilder b; b.append("getlasterror", 1); if (fsync) b.append("fsync", 1); if (j) b.append("j", 1); // only affects request when greater than one node if (w >= 1) b.append("w", w); else if (w == -1) b.append("w", "majority"); if (wtimeout > 0) b.append("wtimeout", wtimeout); runCommand(db, b.obj(), info); return info; } string DBClientWithCommands::getLastError(bool fsync, bool j, int w, int wtimeout) { return getLastError("admin", fsync, j, w, wtimeout); } string DBClientWithCommands::getLastError( const std::string& db, bool fsync, bool j, int w, int wtimeout) { BSONObj info = getLastErrorDetailed(db, fsync, j, w, wtimeout); return getLastErrorString(info); } string DBClientWithCommands::getLastErrorString(const BSONObj& info) { if (info["ok"].trueValue()) { BSONElement e = info["err"]; if (e.eoo()) return ""; if (e.type() == Object) return e.toString(); return e.str(); } else { // command failure BSONElement e = info["errmsg"]; if (e.eoo()) return ""; if (e.type() == Object) return "getLastError command failed: " + e.toString(); return "getLastError command failed: " + e.str(); } } const BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}"); BSONObj DBClientWithCommands::getPrevError() { BSONObj info; runCommand("admin", getpreverrorcmdobj, info); return info; } string DBClientWithCommands::createPasswordDigest(const string& username, const string& clearTextPassword) { return mongo::createPasswordDigest(username, clearTextPassword); } namespace { class ScopedMetadataWriterRemover { MONGO_DISALLOW_COPYING(ScopedMetadataWriterRemover); public: ScopedMetadataWriterRemover(DBClientWithCommands* cli) : _cli(cli), _oldWriter(cli->getRequestMetadataWriter()) { _cli->setRequestMetadataWriter(rpc::RequestMetadataWriter{}); } ~ScopedMetadataWriterRemover() { _cli->setRequestMetadataWriter(_oldWriter); } private: DBClientWithCommands* const _cli; rpc::RequestMetadataWriter _oldWriter; }; } // namespace void DBClientWithCommands::_auth(const BSONObj& params) { ScopedMetadataWriterRemover remover{this}; // We will only have a client name if SSL is enabled std::string clientName = ""; #ifdef MONGO_CONFIG_SSL if (sslManager() != nullptr) { clientName = sslManager()->getSSLConfiguration().clientSubjectName; } #endif auth::authenticateClient( params, HostAndPort(getServerAddress()).host(), clientName, [this](RemoteCommandRequest request, auth::AuthCompletionHandler handler) { BSONObj info; auto start = Date_t::now(); auto commandName = request.cmdObj.firstElementFieldName(); try { auto reply = runCommandWithMetadata( request.dbname, commandName, request.metadata, request.cmdObj); BSONObj data = reply->getCommandReply().getOwned(); BSONObj metadata = reply->getMetadata().getOwned(); Milliseconds millis(Date_t::now() - start); // Hand control back to authenticateClient() handler(StatusWith( RemoteCommandResponse(data, metadata, millis))); } catch (...) { handler(exceptionToStatus()); } }); } bool DBClientWithCommands::authenticateInternalUser() { if (!isInternalAuthSet()) { if (!serverGlobalParams.quiet) { log() << "ERROR: No authentication parameters set for internal user"; } return false; } try { auth(getInternalUserAuthParamsWithFallback()); return true; } catch (const UserException& ex) { if (!serverGlobalParams.quiet) { log() << "can't authenticate to " << toString() << " as internal user, error: " << ex.what(); } return false; } } void DBClientWithCommands::auth(const BSONObj& params) { _auth(params); } bool DBClientWithCommands::auth(const string& dbname, const string& username, const string& password_text, string& errmsg, bool digestPassword) { try { const auto authParams = auth::buildAuthParams(dbname, username, password_text, digestPassword); auth(authParams); return true; } catch (const UserException& ex) { if (ex.getCode() != ErrorCodes::AuthenticationFailed) throw; errmsg = ex.what(); return false; } } void DBClientWithCommands::logout(const string& dbname, BSONObj& info) { runCommand(dbname, BSON("logout" << 1), info); } BSONObj ismastercmdobj = fromjson("{\"ismaster\":1}"); bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj* info) { BSONObj o; if (info == 0) info = &o; bool ok = runCommand("admin", ismastercmdobj, *info); isMaster = info->getField("ismaster").trueValue(); return ok; } bool DBClientWithCommands::createCollection( const string& ns, long long size, bool capped, int max, BSONObj* info) { verify(!capped || size); BSONObj o; if (info == 0) info = &o; BSONObjBuilder b; string db = nsToDatabase(ns); b.append("create", ns.c_str() + db.length() + 1); if (size) b.append("size", size); if (capped) b.append("capped", true); if (max) b.append("max", max); return runCommand(db.c_str(), b.done(), *info); } bool DBClientWithCommands::copyDatabase(const string& fromdb, const string& todb, const string& fromhost, BSONObj* info) { BSONObj o; if (info == 0) info = &o; BSONObjBuilder b; b.append("copydb", 1); b.append("fromhost", fromhost); b.append("fromdb", fromdb); b.append("todb", todb); return runCommand("admin", b.done(), *info); } bool DBClientWithCommands::eval(const string& dbname, const string& jscode, BSONObj& info, BSONElement& retValue, BSONObj* args) { BSONObjBuilder b; b.appendCode("$eval", jscode); if (args) b.appendArray("args", *args); bool ok = runCommand(dbname, b.done(), info); if (ok) retValue = info.getField("retval"); return ok; } bool DBClientWithCommands::eval(const string& dbname, const string& jscode) { BSONObj info; BSONElement retValue; return eval(dbname, jscode, info, retValue); } list DBClientWithCommands::getDatabaseNames() { BSONObj info; uassert(10005, "listdatabases failed", runCommand("admin", BSON("listDatabases" << 1), info, QueryOption_SlaveOk)); uassert(10006, "listDatabases.databases not array", info["databases"].type() == Array); list names; BSONObjIterator i(info["databases"].embeddedObjectUserCheck()); while (i.more()) { names.push_back(i.next().embeddedObjectUserCheck()["name"].valuestr()); } return names; } list DBClientWithCommands::getCollectionNames(const string& db) { list infos = getCollectionInfos(db); list names; for (list::iterator it = infos.begin(); it != infos.end(); ++it) { names.push_back(db + "." + (*it)["name"].valuestr()); } return names; } list DBClientWithCommands::getCollectionInfos(const string& db, const BSONObj& filter) { list infos; // first we're going to try the command // it was only added in 3.0, so if we're talking to an older server // we'll fail back to querying system.namespaces // TODO(spencer): remove fallback behavior after 3.0 { BSONObj res; if (runCommand(db, BSON("listCollections" << 1 << "filter" << filter << "cursor" << BSONObj()), res, QueryOption_SlaveOk)) { BSONObj cursorObj = res["cursor"].Obj(); BSONObj collections = cursorObj["firstBatch"].Obj(); BSONObjIterator it(collections); while (it.more()) { BSONElement e = it.next(); infos.push_back(e.Obj().getOwned()); } const long long id = cursorObj["id"].Long(); if (id != 0) { const std::string ns = cursorObj["ns"].String(); unique_ptr cursor = getMore(ns, id, 0, 0); while (cursor->more()) { infos.push_back(cursor->nextSafe().getOwned()); } } return infos; } // command failed int code = res["code"].numberInt(); string errmsg = res["errmsg"].valuestrsafe(); if (code == ErrorCodes::CommandNotFound || errmsg.find("no such cmd") != string::npos) { // old version of server, ok, fall through to old code } else { uasserted(18630, str::stream() << "listCollections failed: " << res); } } // SERVER-14951 filter for old version fallback needs to db qualify the 'name' element BSONObjBuilder fallbackFilter; if (filter.hasField("name") && filter["name"].type() == String) { fallbackFilter.append("name", db + "." + filter["name"].str()); } fallbackFilter.appendElementsUnique(filter); string ns = db + ".system.namespaces"; unique_ptr c = query(ns.c_str(), fallbackFilter.obj(), 0, 0, 0, QueryOption_SlaveOk); uassert(28611, str::stream() << "listCollections failed querying " << ns, c.get()); while (c->more()) { BSONObj obj = c->nextSafe(); string ns = obj["name"].valuestr(); if (ns.find("$") != string::npos) continue; BSONObjBuilder b; b.append("name", ns.substr(db.size() + 1)); b.appendElementsUnique(obj); infos.push_back(b.obj()); } return infos; } bool DBClientWithCommands::exists(const string& ns) { BSONObj filter = BSON("name" << nsToCollectionSubstring(ns)); list results = getCollectionInfos(nsToDatabase(ns), filter); return !results.empty(); } /* --- dbclientconnection --- */ void DBClientConnection::_auth(const BSONObj& params) { if (autoReconnect) { /* note we remember the auth info before we attempt to auth -- if the connection is broken, * we will then have it for the next autoreconnect attempt. */ authCache[params[auth::getSaslCommandUserDBFieldName()].str()] = params.getOwned(); } DBClientBase::_auth(params); } /** query N objects from the database into an array. makes sense mostly when you want a small * number of results. if a huge number, use query() and iterate the cursor. */ void DBClientInterface::findN(vector& out, const string& ns, Query query, int nToReturn, int nToSkip, const BSONObj* fieldsToReturn, int queryOptions) { out.reserve(nToReturn); unique_ptr c = this->query(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions); uassert(10276, str::stream() << "DBClientBase::findN: transport error: " << getServerAddress() << " ns: " << ns << " query: " << query.toString(), c.get()); if (c->hasResultFlag(ResultFlag_ShardConfigStale)) { BSONObj error; c->peekError(&error); throw RecvStaleConfigException("findN stale config", error); } for (int i = 0; i < nToReturn; i++) { if (!c->more()) break; out.push_back(c->nextSafe().copy()); } } BSONObj DBClientInterface::findOne(const string& ns, const Query& query, const BSONObj* fieldsToReturn, int queryOptions) { vector v; findN(v, ns, query, 1, 0, fieldsToReturn, queryOptions); return v.empty() ? BSONObj() : v[0]; } namespace { /** * RAII class to force usage of OP_QUERY on a connection. */ class ScopedForceOpQuery { public: ScopedForceOpQuery(DBClientBase* conn) : _conn(conn), _oldProtos(conn->getClientRPCProtocols()) { _conn->setClientRPCProtocols(rpc::supports::kOpQueryOnly); } ~ScopedForceOpQuery() { _conn->setClientRPCProtocols(_oldProtos); } private: DBClientBase* const _conn; const rpc::ProtocolSet _oldProtos; }; /** * Initializes the wire version of conn, and returns the isMaster reply. */ StatusWith initWireVersion(DBClientConnection* conn) { try { // We need to force the usage of OP_QUERY on this command, even if we have previously // detected support for OP_COMMAND on a connection. This is necessary to handle the case // where we reconnect to an older version of MongoDB running at the same host/port. ScopedForceOpQuery forceOpQuery{conn}; BSONObjBuilder bob; bob.append("isMaster", 1); if (Command::testCommandsEnabled) { // Only include the host:port of this process in the isMaster command request if test // commands are enabled. mongobridge uses this field to identify the process opening a // connection to it. StringBuilder sb; sb << getHostName() << ':' << serverGlobalParams.port; bob.append("hostInfo", sb.str()); } Date_t start{Date_t::now()}; auto result = conn->runCommandWithMetadata("admin", "isMaster", rpc::makeEmptyMetadata(), bob.done()); Date_t finish{Date_t::now()}; BSONObj isMasterObj = result->getCommandReply().getOwned(); if (isMasterObj.hasField("minWireVersion") && isMasterObj.hasField("maxWireVersion")) { int minWireVersion = isMasterObj["minWireVersion"].numberInt(); int maxWireVersion = isMasterObj["maxWireVersion"].numberInt(); conn->setWireVersions(minWireVersion, maxWireVersion); } return executor::RemoteCommandResponse{ std::move(isMasterObj), result->getMetadata().getOwned(), finish - start}; } catch (...) { return exceptionToStatus(); } } } // namespace bool DBClientConnection::connect(const HostAndPort& server, std::string& errmsg) { auto connectStatus = connect(server); if (!connectStatus.isOK()) { errmsg = connectStatus.reason(); return false; } return true; } Status DBClientConnection::connect(const HostAndPort& serverAddress) { auto connectStatus = connectSocketOnly(serverAddress); if (!connectStatus.isOK()) { return connectStatus; } auto swIsMasterReply = initWireVersion(this); if (!swIsMasterReply.isOK()) { _failed = true; return swIsMasterReply.getStatus(); } auto swProtocolSet = rpc::parseProtocolSetFromIsMasterReply(swIsMasterReply.getValue().data); if (!swProtocolSet.isOK()) { return swProtocolSet.getStatus(); } _setServerRPCProtocols(swProtocolSet.getValue()); auto negotiatedProtocol = rpc::negotiate(getServerRPCProtocols(), rpc::computeProtocolSet(WireSpec::instance().minWireVersionOutgoing, WireSpec::instance().maxWireVersionOutgoing)); if (!negotiatedProtocol.isOK()) { return negotiatedProtocol.getStatus(); } if (_hook) { auto validationStatus = _hook(swIsMasterReply.getValue()); if (!validationStatus.isOK()) { // Disconnect and mark failed. _failed = true; _port.reset(); return validationStatus; } } return Status::OK(); } Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) { _serverAddress = serverAddress; _failed = true; // We need to construct a SockAddr so we can resolve the address. SockAddr osAddr{serverAddress.host().c_str(), serverAddress.port()}; if (!osAddr.isValid()) { return Status(ErrorCodes::InvalidOptions, str::stream() << "couldn't initialize connection to host " << serverAddress.host() << ", address is invalid"); } _port.reset(new MessagingPort(_so_timeout, _logLevel)); if (serverAddress.host().empty()) { return Status(ErrorCodes::InvalidOptions, str::stream() << "couldn't connect to server " << _serverAddress.toString() << ", host is empty"); } if (osAddr.getAddr() == "0.0.0.0") { return Status(ErrorCodes::InvalidOptions, str::stream() << "couldn't connect to server " << _serverAddress.toString() << ", address resolved to 0.0.0.0"); } _resolvedAddress = osAddr.getAddr(); if (!_port->connect(osAddr)) { return Status(ErrorCodes::HostUnreachable, str::stream() << "couldn't connect to server " << _serverAddress.toString() << ", connection attempt failed"); } #ifdef MONGO_CONFIG_SSL int sslModeVal = sslGlobalParams.sslMode.load(); if (sslModeVal == SSLParams::SSLMode_preferSSL || sslModeVal == SSLParams::SSLMode_requireSSL) { if (!_port->secure(sslManager(), serverAddress.host())) { return Status(ErrorCodes::SSLHandshakeFailed, "Failed to initialize SSL on connection"); } } #endif _failed = false; LOG(1) << "connected to server " << toString() << endl; return Status::OK(); } void DBClientConnection::logout(const string& dbname, BSONObj& info) { authCache.erase(dbname); runCommand(dbname, BSON("logout" << 1), info); } bool DBClientConnection::runCommand(const string& dbname, const BSONObj& cmd, BSONObj& info, int options) { if (DBClientWithCommands::runCommand(dbname, cmd, info, options)) return true; if (!_parentReplSetName.empty()) { handleNotMasterResponse(info["errmsg"]); } return false; } void DBClientConnection::_checkConnection() { if (!_failed) return; if (!autoReconnect) throw SocketException(SocketException::FAILED_STATE, toString()); // Don't hammer reconnects, backoff if needed autoReconnectBackoff.nextSleepMillis(); LOG(_logLevel) << "trying reconnect to " << toString() << endl; string errmsg; _failed = false; auto connectStatus = connect(_serverAddress); if (!connectStatus.isOK()) { _failed = true; LOG(_logLevel) << "reconnect " << toString() << " failed " << errmsg << endl; if (connectStatus == ErrorCodes::IncompatibleCatalogManager) { uassertStatusOK(connectStatus); // Will always throw } else { throw SocketException(SocketException::CONNECT_ERROR, connectStatus.reason()); } } LOG(_logLevel) << "reconnect " << toString() << " ok" << endl; for (map::const_iterator i = authCache.begin(); i != authCache.end(); i++) { try { DBClientConnection::_auth(i->second); } catch (UserException& ex) { if (ex.getCode() != ErrorCodes::AuthenticationFailed) throw; LOG(_logLevel) << "reconnect: auth failed " << i->second[auth::getSaslCommandUserDBFieldName()] << i->second[auth::getSaslCommandUserFieldName()] << ' ' << ex.what() << std::endl; } } } void DBClientConnection::setSoTimeout(double timeout) { _so_timeout = timeout; if (_port) { _port->setSocketTimeout(timeout); } } uint64_t DBClientConnection::getSockCreationMicroSec() const { if (_port) { return _port->getSockCreationMicroSec(); } else { return INVALID_SOCK_CREATION_TIME; } } const uint64_t DBClientBase::INVALID_SOCK_CREATION_TIME = static_cast(0xFFFFFFFFFFFFFFFFULL); unique_ptr DBClientBase::query(const string& ns, Query query, int nToReturn, int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, int batchSize) { unique_ptr c(new DBClientCursor( this, ns, query.obj, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize)); if (c->init()) return c; return nullptr; } unique_ptr DBClientBase::getMore(const string& ns, long long cursorId, int nToReturn, int options) { unique_ptr c(new DBClientCursor(this, ns, cursorId, nToReturn, options)); if (c->init()) return c; return nullptr; } struct DBClientFunConvertor { void operator()(DBClientCursorBatchIterator& i) { while (i.moreInCurrentBatch()) { _f(i.nextSafe()); } } stdx::function _f; }; unsigned long long DBClientBase::query(stdx::function f, const string& ns, Query query, const BSONObj* fieldsToReturn, int queryOptions) { DBClientFunConvertor fun; fun._f = f; stdx::function ptr(fun); return this->query(ptr, ns, query, fieldsToReturn, queryOptions); } unsigned long long DBClientBase::query(stdx::function f, const string& ns, Query query, const BSONObj* fieldsToReturn, int queryOptions) { // mask options queryOptions &= (int)(QueryOption_NoCursorTimeout | QueryOption_SlaveOk); unique_ptr c(this->query(ns, query, 0, 0, fieldsToReturn, queryOptions)); uassert(16090, "socket error for mapping query", c.get()); unsigned long long n = 0; while (c->more()) { DBClientCursorBatchIterator i(*c); f(i); n += i.n(); } return n; } unsigned long long DBClientConnection::query(stdx::function f, const string& ns, Query query, const BSONObj* fieldsToReturn, int queryOptions) { if (!(availableOptions() & QueryOption_Exhaust)) { return DBClientBase::query(f, ns, query, fieldsToReturn, queryOptions); } // mask options queryOptions &= (int)(QueryOption_NoCursorTimeout | QueryOption_SlaveOk); queryOptions |= (int)QueryOption_Exhaust; unique_ptr c(this->query(ns, query, 0, 0, fieldsToReturn, queryOptions)); uassert(13386, "socket error for mapping query", c.get()); unsigned long long n = 0; try { while (1) { while (c->moreInCurrentBatch()) { DBClientCursorBatchIterator i(*c); f(i); n += i.n(); } if (c->getCursorId() == 0) break; c->exhaustReceiveMore(); } } catch (std::exception&) { /* connection CANNOT be used anymore as more data may be on the way from the server. we have to reconnect. */ _failed = true; _port->shutdown(); throw; } return n; } void DBClientBase::insert(const string& ns, BSONObj obj, int flags) { BufBuilder b; int reservedFlags = 0; if (flags & InsertOption_ContinueOnError) reservedFlags |= Reserved_InsertOption_ContinueOnError; b.appendNum(reservedFlags); b.appendStr(ns); obj.appendSelfToBufBuilder(b); Message toSend; toSend.setData(dbInsert, b.buf(), b.len()); say(toSend); } // TODO: Merge with other insert implementation? void DBClientBase::insert(const string& ns, const vector& v, int flags) { BufBuilder b; int reservedFlags = 0; if (flags & InsertOption_ContinueOnError) reservedFlags |= Reserved_InsertOption_ContinueOnError; b.appendNum(reservedFlags); b.appendStr(ns); for (vector::const_iterator i = v.begin(); i != v.end(); ++i) i->appendSelfToBufBuilder(b); Message toSend; toSend.setData(dbInsert, b.buf(), b.len()); say(toSend); } void DBClientBase::remove(const string& ns, Query obj, int flags) { BufBuilder b; const int reservedFlags = 0; b.appendNum(reservedFlags); b.appendStr(ns); b.appendNum(flags); obj.obj.appendSelfToBufBuilder(b); Message toSend; toSend.setData(dbDelete, b.buf(), b.len()); say(toSend); } void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upsert, bool multi) { int flags = 0; if (upsert) flags |= UpdateOption_Upsert; if (multi) flags |= UpdateOption_Multi; update(ns, query, obj, flags); } void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags) { BufBuilder b; const int reservedFlags = 0; b.appendNum(reservedFlags); b.appendStr(ns); b.appendNum(flags); query.obj.appendSelfToBufBuilder(b); obj.appendSelfToBufBuilder(b); Message toSend; toSend.setData(dbUpdate, b.buf(), b.len()); say(toSend); } void DBClientBase::killCursor(long long cursorId) { StackBufBuilder b; b.appendNum((int)0); // reserved b.appendNum((int)1); // number b.appendNum(cursorId); Message m; m.setData(dbKillCursors, b.buf(), b.len()); say(m); } list DBClientWithCommands::getIndexSpecs(const string& ns, int options) { list specs; { BSONObj cmd = BSON("listIndexes" << nsToCollectionSubstring(ns) << "cursor" << BSONObj()); BSONObj res; if (runCommand(nsToDatabase(ns), cmd, res, options)) { BSONObj cursorObj = res["cursor"].Obj(); BSONObjIterator i(cursorObj["firstBatch"].Obj()); while (i.more()) { specs.push_back(i.next().Obj().getOwned()); } const long long id = cursorObj["id"].Long(); if (id != 0) { const std::string ns = cursorObj["ns"].String(); unique_ptr cursor = getMore(ns, id, 0, 0); while (cursor->more()) { specs.push_back(cursor->nextSafe().getOwned()); } } return specs; } int code = res["code"].numberInt(); string errmsg = res["errmsg"].valuestrsafe(); if (code == ErrorCodes::CommandNotFound || errmsg.find("no such cmd") != string::npos) { // old version of server, ok, fall through to old code } else if (code == ErrorCodes::NamespaceNotFound) { return specs; } else { uasserted(18631, str::stream() << "listIndexes failed: " << res); } } // fallback to querying system.indexes // TODO(spencer): Remove fallback behavior after 3.0 unique_ptr cursor = query(NamespaceString(ns).getSystemIndexesCollection(), BSON("ns" << ns), 0, 0, 0, options); uassert(28612, str::stream() << "listIndexes failed querying " << ns, cursor.get()); while (cursor->more()) { BSONObj spec = cursor->nextSafe(); specs.push_back(spec.getOwned()); } return specs; } void DBClientWithCommands::dropIndex(const string& ns, BSONObj keys) { dropIndex(ns, genIndexName(keys)); } void DBClientWithCommands::dropIndex(const string& ns, const string& indexName) { BSONObj info; if (!runCommand(nsToDatabase(ns), BSON("deleteIndexes" << nsToCollectionSubstring(ns) << "index" << indexName), info)) { LOG(_logLevel) << "dropIndex failed: " << info << endl; uassert(10007, "dropIndex failed", 0); } } void DBClientWithCommands::dropIndexes(const string& ns) { BSONObj info; uassert(10008, "dropIndexes failed", runCommand(nsToDatabase(ns), BSON("deleteIndexes" << nsToCollectionSubstring(ns) << "index" << "*"), info)); } void DBClientWithCommands::reIndex(const string& ns) { BSONObj info; uassert(18908, str::stream() << "reIndex failed: " << info, runCommand(nsToDatabase(ns), BSON("reIndex" << nsToCollectionSubstring(ns)), info)); } string DBClientWithCommands::genIndexName(const BSONObj& keys) { stringstream ss; bool first = 1; for (BSONObjIterator i(keys); i.more();) { BSONElement f = i.next(); if (first) first = 0; else ss << "_"; ss << f.fieldName() << "_"; if (f.isNumber()) ss << f.numberInt(); else ss << f.str(); // this should match up with shell command } return ss.str(); } void DBClientWithCommands::ensureIndex(const string& ns, BSONObj keys, bool unique, const string& name, bool background, int version, int ttl) { BSONObjBuilder toSave; toSave.append("ns", ns); toSave.append("key", keys); string cacheKey(ns); cacheKey += "--"; if (name != "") { toSave.append("name", name); cacheKey += name; } else { string nn = genIndexName(keys); toSave.append("name", nn); cacheKey += nn; } if (version >= 0) toSave.append("v", version); if (unique) toSave.appendBool("unique", unique); if (background) toSave.appendBool("background", true); if (ttl > 0) toSave.append("expireAfterSeconds", ttl); insert(NamespaceString(ns).getSystemIndexesCollection(), toSave.obj()); } /* -- DBClientCursor ---------------------------------------------- */ void assembleQueryRequest(const string& ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, Message& toSend) { if (kDebugBuild) { massert(10337, (string) "object not valid assembleRequest query", query.isValid()); } // see query.h for the protocol we are using here. BufBuilder b; int opts = queryOptions; b.appendNum(opts); b.appendStr(ns); b.appendNum(nToSkip); b.appendNum(nToReturn); query.appendSelfToBufBuilder(b); if (fieldsToReturn) fieldsToReturn->appendSelfToBufBuilder(b); toSend.setData(dbQuery, b.buf(), b.len()); } DBClientConnection::DBClientConnection(bool _autoReconnect, double so_timeout, const HandshakeValidationHook& hook) : _failed(false), autoReconnect(_autoReconnect), autoReconnectBackoff(1000, 2000), _so_timeout(so_timeout), _hook(hook) { _numConnections.fetchAndAdd(1); } void DBClientConnection::say(Message& toSend, bool isRetry, string* actualServer) { checkConnection(); try { port().say(toSend); } catch (SocketException&) { _failed = true; throw; } } bool DBClientConnection::recv(Message& m) { if (port().recv(m)) { return true; } _failed = true; return false; } bool DBClientConnection::call(Message& toSend, Message& response, bool assertOk, string* actualServer) { /* todo: this is very ugly messagingport::call returns an error code AND can throw an exception. we should make it return void and just throw an exception anytime it fails */ checkConnection(); try { if (!port().call(toSend, response)) { _failed = true; if (assertOk) uasserted(10278, str::stream() << "dbclient error communicating with server: " << getServerAddress()); return false; } } catch (SocketException&) { _failed = true; throw; } return true; } BSONElement getErrField(const BSONObj& o) { BSONElement first = o.firstElement(); if (strcmp(first.fieldName(), "$err") == 0) return first; // temp - will be DEV only later /*DEV*/ if (1) { BSONElement e = o["$err"]; if (!e.eoo()) { wassert(false); } return e; } return BSONElement(); } bool hasErrField(const BSONObj& o) { return !getErrField(o).eoo(); } void DBClientConnection::checkResponse(const char* data, int nReturned, bool* retry, string* host) { /* check for errors. the only one we really care about at * this stage is "not master" */ *retry = false; *host = _serverAddress.toString(); if (!_parentReplSetName.empty() && nReturned) { verify(data); BSONObj bsonView(data); handleNotMasterResponse(getErrField(bsonView)); } } void DBClientConnection::setParentReplSetName(const string& replSetName) { _parentReplSetName = replSetName; } void DBClientConnection::handleNotMasterResponse(const BSONElement& elemToCheck) { if (!isNotMasterErrorString(elemToCheck)) { return; } MONGO_LOG_COMPONENT(1, logger::LogComponent::kReplication) << "got not master from: " << _serverAddress << " of repl set: " << _parentReplSetName; ReplicaSetMonitorPtr monitor = ReplicaSetMonitor::get(_parentReplSetName); if (monitor) { monitor->failedHost(_serverAddress); } _failed = true; } AtomicInt32 DBClientConnection::_numConnections; /** @return the database name portion of an ns string */ string nsGetDB(const string& ns) { string::size_type pos = ns.find("."); if (pos == string::npos) return ns; return ns.substr(0, pos); } /** @return the collection name portion of an ns string */ string nsGetCollection(const string& ns) { string::size_type pos = ns.find("."); if (pos == string::npos) return ""; return ns.substr(pos + 1); } } // namespace mongo