/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/client/shard_connection.h" #include #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/version_manager.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" #include "mongo/util/concurrency/spin_lock.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/stacktrace.h" namespace mongo { using std::unique_ptr; using std::map; using std::set; using std::string; using std::stringstream; using std::vector; namespace { class ClientConnections; /** * Class which tracks ClientConnections (the client connection pool) for each incoming * connection, allowing stats access. */ class ActiveClientConnections { public: void add(const ClientConnections* cc) { stdx::lock_guard lock(_mutex); _clientConnections.insert(cc); } void remove(const ClientConnections* cc) { stdx::lock_guard lock(_mutex); _clientConnections.erase(cc); } void appendInfo(BSONObjBuilder& b); private: stdx::mutex _mutex; set _clientConnections; } activeClientConnections; /** * Command to allow access to the sharded conn pool information in mongos. */ class ShardedPoolStats : public Command { public: ShardedPoolStats() : Command("shardConnPoolStats") {} virtual void help(stringstream& help) const { help << "stats about the shard connection pool"; } virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } virtual bool slaveOk() const { return true; } // Same privs as connPoolStats virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::connPoolStats); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } virtual bool run(OperationContext* opCtx, const string& dbname, mongo::BSONObj& cmdObj, std::string& errmsg, mongo::BSONObjBuilder& result) { // Connection information executor::ConnectionPoolStats stats{}; shardConnectionPool.appendConnectionStats(&stats); stats.appendToBSON(result); // Thread connection information activeClientConnections.appendInfo(result); return true; } } shardedPoolStatsCmd; /** * holds all the actual db connections for a client to various servers 1 per thread, so * doesn't have to be thread safe. */ class ClientConnections { MONGO_DISALLOW_COPYING(ClientConnections); public: struct Status { Status() : created(0), avail(0) {} // May be read concurrently, but only written from // this thread. long long created; DBClientBase* avail; }; // Gets or creates the status object for the host Status* _getStatus(const string& addr) { scoped_spinlock lock(_lock); Status*& temp = _hosts[addr]; if (!temp) { temp = new Status(); } return temp; } ClientConnections() { // Start tracking client connections activeClientConnections.add(this); } ~ClientConnections() { // Stop tracking these client connections activeClientConnections.remove(this); releaseAll(true); } void releaseAll(bool fromDestructor = false) { // Don't need spinlock protection because if not in the destructor, we don't modify // _hosts, and if in the destructor we are not accessible to external threads. for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) { const string addr = i->first; Status* ss = i->second; invariant(ss); if (ss->avail) { // If we're shutting down, don't want to initiate release mechanism as it is // slow, and isn't needed since all connections will be closed anyway. if (globalInShutdownDeprecated()) { if (versionManager.isVersionableCB(ss->avail)) { versionManager.resetShardVersionCB(ss->avail); } delete ss->avail; } else { release(addr, ss->avail); } ss->avail = 0; } if (fromDestructor) { delete ss; } } if (fromDestructor) { _hosts.clear(); } } DBClientBase* get(const string& addr, const string& ns) { { // We want to report ns stats scoped_spinlock lock(_lock); if (ns.size() > 0) _seenNS.insert(ns); } Status* s = _getStatus(addr); unique_ptr c; if (s->avail) { c.reset(s->avail); s->avail = 0; // May throw an exception shardConnectionPool.onHandedOut(c.get()); } else { c.reset(shardConnectionPool.get(addr)); // After, so failed creation doesn't get counted s->created++; } return c.release(); } void done(const string& addr, DBClientBase* conn) { Status* s = _hosts[addr]; verify(s); const bool isConnGood = shardConnectionPool.isConnectionGood(addr, conn); if (s->avail != NULL) { warning() << "Detected additional sharded connection in the " << "thread local pool for " << addr; if (DBException::traceExceptions.load()) { // There shouldn't be more than one connection checked out to the same // host on the same thread. printStackTrace(); } if (!isConnGood) { delete s->avail; s->avail = NULL; } // Let the internal pool handle the bad connection, this can also // update the lower bounds for the known good socket creation time // for this host. release(addr, conn); return; } if (!isConnGood) { // Let the internal pool handle the bad connection. release(addr, conn); return; } // Note: Although we try our best to clear bad connections as much as possible, // some of them can still slip through because of how ClientConnections are being // used - as thread local variables. This means that threads won't be able to // see the s->avail connection of other threads. s->avail = conn; } void checkVersions(OperationContext* opCtx, const string& ns) { vector all; grid.shardRegistry()->getAllShardIds(&all); // Don't report exceptions here as errors in GetLastError LastError::Disabled ignoreForGLE(&LastError::get(cc())); // Now only check top-level shard connections for (const ShardId& shardId : all) { try { auto shardStatus = grid.shardRegistry()->getShard(opCtx, shardId); if (!shardStatus.isOK()) { invariant(shardStatus == ErrorCodes::ShardNotFound); continue; } const auto shard = shardStatus.getValue(); string sconnString = shard->getConnString().toString(); Status* s = _getStatus(sconnString); if (!s->avail) { s->avail = shardConnectionPool.get(sconnString); s->created++; // After, so failed creation doesn't get counted } versionManager.checkShardVersionCB(opCtx, s->avail, ns, false, 1); } catch (const DBException& ex) { warning() << "problem while initially checking shard versions on" << " " << shardId << causedBy(ex); // NOTE: This is only a heuristic, to avoid multiple stale version retries // across multiple shards, and does not affect correctness. } } } void release(const string& addr, DBClientBase* conn) { shardConnectionPool.release(addr, conn); } /** * Appends info about the client connection pool to a BOBuilder * Safe to call with activeClientConnections lock */ void appendInfo(BSONObjBuilder& b) const { scoped_spinlock lock(_lock); BSONArrayBuilder hostsArrB(b.subarrayStart("hosts")); for (HostMap::const_iterator i = _hosts.begin(); i != _hosts.end(); ++i) { BSONObjBuilder bb(hostsArrB.subobjStart()); bb.append("host", i->first); bb.append("created", i->second->created); bb.appendBool("avail", static_cast(i->second->avail)); bb.done(); } hostsArrB.done(); BSONArrayBuilder nsArrB(b.subarrayStart("seenNS")); for (set::const_iterator i = _seenNS.begin(); i != _seenNS.end(); ++i) { nsArrB.append(*i); } nsArrB.done(); } // Protects only the creation of new entries in the _hosts and _seenNS map // from external threads. Reading _hosts / _seenNS in this thread doesn't // need protection. mutable SpinLock _lock; typedef map HostMap; HostMap _hosts; set _seenNS; /** * Clears the connections kept by this pool (ie, not including the global pool) */ void clearPool() { for (HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) { if (iter->second->avail != NULL) { delete iter->second->avail; } delete iter->second; } _hosts.clear(); } void forgetNS(const string& ns) { scoped_spinlock lock(_lock); _seenNS.erase(ns); } // ----- static thread_specific_ptr _perThread; static ClientConnections* threadInstance() { ClientConnections* cc = _perThread.get(); if (!cc) { cc = new ClientConnections(); _perThread.reset(cc); } return cc; } }; void ActiveClientConnections::appendInfo(BSONObjBuilder& b) { BSONArrayBuilder arr(64 * 1024); // There may be quite a few threads { stdx::lock_guard lock(_mutex); for (set::const_iterator i = _clientConnections.begin(); i != _clientConnections.end(); ++i) { BSONObjBuilder bb(arr.subobjStart()); (*i)->appendInfo(bb); bb.done(); } } b.appendArray("threads", arr.obj()); } thread_specific_ptr ClientConnections::_perThread; } // namespace // The global connection pool DBConnectionPool shardConnectionPool; ShardConnection::ShardConnection(const ConnectionString& connectionString, const string& ns, std::shared_ptr manager) : _cs(connectionString), _ns(ns), _manager(manager), _finishedInit(false) { invariant(_cs.isValid()); // Make sure we specified a manager for the correct namespace if (_ns.size() && _manager) { invariant(_manager->getns() == _ns); } auto csString = _cs.toString(); _conn = ClientConnections::threadInstance()->get(csString, _ns); if (isMongos()) { // In mongos, we record this connection as having been used for useful work to provide // useful information in getLastError. ClusterLastErrorInfo::get(cc())->addShardHost(csString); } } ShardConnection::~ShardConnection() { if (_conn) { if (_conn->isFailed()) { if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) { kill(); } else { // The pool takes care of deleting the failed connection - this // will also trigger disposal of older connections in the pool done(); } } else { // see done() comments above for why we log this line log() << "sharded connection to " << _conn->getServerAddress() << " not being returned to the pool"; kill(); } } } void ShardConnection::_finishInit() { if (_finishedInit) return; _finishedInit = true; if (versionManager.isVersionableCB(_conn)) { auto& client = cc(); auto opCtx = client.getOperationContext(); invariant(opCtx); _setVersion = versionManager.checkShardVersionCB(opCtx, this, false, 1); } else { // Make sure we didn't specify a manager for a non-versionable connection (i.e. config) verify(!_manager); _setVersion = false; } } void ShardConnection::done() { if (_conn) { ClientConnections::threadInstance()->done(_cs.toString(), _conn); _conn = 0; _finishedInit = true; } } void ShardConnection::kill() { if (_conn) { if (versionManager.isVersionableCB(_conn)) { versionManager.resetShardVersionCB(_conn); } if (_conn->isFailed()) { // Let the pool know about the bad connection and also delegate disposal to it. ClientConnections::threadInstance()->done(_cs.toString(), _conn); } else { delete _conn; } _conn = 0; _finishedInit = true; } } void ShardConnection::checkMyConnectionVersions(OperationContext* opCtx, const string& ns) { ClientConnections::threadInstance()->checkVersions(opCtx, ns); } void ShardConnection::releaseMyConnections() { ClientConnections::threadInstance()->releaseAll(); } void ShardConnection::clearPool() { shardConnectionPool.clear(); ClientConnections::threadInstance()->clearPool(); } void ShardConnection::forgetNS(const string& ns) { ClientConnections::threadInstance()->forgetNS(ns); } } // namespace mongo