/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/client/shard_connection.h"
#include
#include "mongo/db/commands.h"
#include "mongo/db/lasterror.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/version_manager.h"
#include "mongo/util/concurrency/spin_lock.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/stacktrace.h"
namespace mongo {
using std::unique_ptr;
using std::map;
using std::set;
using std::string;
using std::stringstream;
using std::vector;
namespace {
class ClientConnections;
/**
* Class which tracks ClientConnections (the client connection pool) for each incoming
* connection, allowing stats access.
*/
class ActiveClientConnections {
public:
void add(const ClientConnections* cc) {
stdx::lock_guard lock(_mutex);
_clientConnections.insert(cc);
}
void remove(const ClientConnections* cc) {
stdx::lock_guard lock(_mutex);
_clientConnections.erase(cc);
}
void appendInfo(BSONObjBuilder& b);
private:
stdx::mutex _mutex;
set _clientConnections;
} activeClientConnections;
/**
* Command to allow access to the sharded conn pool information in mongos.
*/
class ShardedPoolStats : public Command {
public:
ShardedPoolStats() : Command( "shardConnPoolStats" ) { }
virtual void help( stringstream &help ) const { help << "stats about the shard connection pool"; }
virtual bool isWriteCommandForConfigServer() const { return false; }
virtual bool slaveOk() const { return true; }
// Same privs as connPoolStats
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector* out) {
ActionSet actions;
actions.addAction(ActionType::connPoolStats);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
virtual bool run(OperationContext* txn,
const string& dbname,
mongo::BSONObj& cmdObj,
int options,
std::string& errmsg,
mongo::BSONObjBuilder& result) {
// Base pool info
shardConnectionPool.appendInfo(result);
// Thread connection info
activeClientConnections.appendInfo(result);
return true;
}
} shardedPoolStatsCmd;
/**
* holds all the actual db connections for a client to various servers 1 per thread, so
* doesn't have to be thread safe.
*/
class ClientConnections {
MONGO_DISALLOW_COPYING(ClientConnections);
public:
struct Status {
Status() : created(0), avail(0) { }
// May be read concurrently, but only written from
// this thread.
long long created;
DBClientBase* avail;
};
// Gets or creates the status object for the host
Status* _getStatus(const string& addr) {
scoped_spinlock lock(_lock);
Status* &temp = _hosts[addr];
if (!temp) {
temp = new Status();
}
return temp;
}
ClientConnections() {
// Start tracking client connections
activeClientConnections.add(this);
}
~ClientConnections() {
// Stop tracking these client connections
activeClientConnections.remove(this);
releaseAll(true);
}
void releaseAll(bool fromDestructor = false) {
// Don't need spinlock protection because if not in the destructor, we don't modify
// _hosts, and if in the destructor we are not accessible to external threads.
for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
const string addr = i->first;
Status* ss = i->second;
invariant(ss);
if (ss->avail) {
// If we're shutting down, don't want to initiate release mechanism as it is
// slow, and isn't needed since all connections will be closed anyway.
if (inShutdown()) {
if (versionManager.isVersionableCB(ss->avail)) {
versionManager.resetShardVersionCB(ss->avail);
}
delete ss->avail;
}
else {
release(addr, ss->avail);
}
ss->avail = 0;
}
if (fromDestructor) {
delete ss;
}
}
if (fromDestructor) {
_hosts.clear();
}
}
DBClientBase * get(const string& addr, const string& ns) {
{
// We want to report ns stats
scoped_spinlock lock(_lock);
if (ns.size() > 0)
_seenNS.insert(ns);
}
Status* s = _getStatus(addr);
unique_ptr c;
if (s->avail) {
c.reset(s->avail);
s->avail = 0;
// May throw an exception
shardConnectionPool.onHandedOut(c.get());
}
else {
c.reset(shardConnectionPool.get(addr));
// After, so failed creation doesn't get counted
s->created++;
}
return c.release();
}
void done( const string& addr , DBClientBase* conn ) {
Status* s = _hosts[addr];
verify( s );
const bool isConnGood = shardConnectionPool.isConnectionGood(addr, conn);
if (s->avail != NULL) {
warning() << "Detected additional sharded connection in the "
<< "thread local pool for " << addr;
if (DBException::traceExceptions) {
// There shouldn't be more than one connection checked out to the same
// host on the same thread.
printStackTrace();
}
if (!isConnGood) {
delete s->avail;
s->avail = NULL;
}
// Let the internal pool handle the bad connection, this can also
// update the lower bounds for the known good socket creation time
// for this host.
release(addr, conn);
return;
}
if (!isConnGood) {
// Let the internal pool handle the bad connection.
release(addr, conn);
return;
}
// Note: Although we try our best to clear bad connections as much as possible,
// some of them can still slip through because of how ClientConnections are being
// used - as thread local variables. This means that threads won't be able to
// see the s->avail connection of other threads.
s->avail = conn;
}
void sync() {
for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) {
string addr = i->first;
Status* ss = i->second;
if ( ss->avail )
ss->avail->getLastError();
}
}
void checkVersions( const string& ns ) {
vector all;
grid.shardRegistry()->getAllShardIds(&all);
// Don't report exceptions here as errors in GetLastError
LastError::Disabled ignoreForGLE(&LastError::get(cc()));
// Now only check top-level shard connections
for (const ShardId& shardId : all) {
try {
const auto shard = grid.shardRegistry()->getShard(shardId);
if (!shard) {
continue;
}
string sconnString = shard->getConnString().toString();
Status* s = _getStatus( sconnString );
if( ! s->avail ) {
s->avail = shardConnectionPool.get( sconnString );
s->created++; // After, so failed creation doesn't get counted
}
versionManager.checkShardVersionCB( s->avail, ns, false, 1 );
}
catch ( const DBException& ex ) {
warning() << "problem while initially checking shard versions on" << " "
<< shardId << causedBy(ex);
// NOTE: This is only a heuristic, to avoid multiple stale version retries
// across multiple shards, and does not affect correctness.
}
}
}
void release( const string& addr , DBClientBase * conn ) {
shardConnectionPool.release( addr , conn );
}
/**
* Appends info about the client connection pool to a BOBuilder
* Safe to call with activeClientConnections lock
*/
void appendInfo( BSONObjBuilder& b ) const {
scoped_spinlock lock( _lock );
BSONArrayBuilder hostsArrB( b.subarrayStart( "hosts" ) );
for ( HostMap::const_iterator i = _hosts.begin(); i != _hosts.end(); ++i ) {
BSONObjBuilder bb( hostsArrB.subobjStart() );
bb.append( "host", i->first );
bb.append( "created", i->second->created );
bb.appendBool( "avail", static_cast( i->second->avail ) );
bb.done();
}
hostsArrB.done();
BSONArrayBuilder nsArrB( b.subarrayStart( "seenNS" ) );
for ( set::const_iterator i = _seenNS.begin(); i != _seenNS.end(); ++i ) {
nsArrB.append(*i);
}
nsArrB.done();
}
// Protects only the creation of new entries in the _hosts and _seenNS map
// from external threads. Reading _hosts / _seenNS in this thread doesn't
// need protection.
mutable SpinLock _lock;
typedef map HostMap;
HostMap _hosts;
set _seenNS;
/**
* Clears the connections kept by this pool (ie, not including the global pool)
*/
void clearPool() {
for(HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) {
if (iter->second->avail != NULL) {
delete iter->second->avail;
}
delete iter->second;
}
_hosts.clear();
}
void forgetNS( const string& ns ) {
scoped_spinlock lock( _lock );
_seenNS.erase( ns );
}
// -----
static thread_specific_ptr _perThread;
static ClientConnections* threadInstance() {
ClientConnections* cc = _perThread.get();
if ( ! cc ) {
cc = new ClientConnections();
_perThread.reset( cc );
}
return cc;
}
};
void ActiveClientConnections::appendInfo(BSONObjBuilder& b) {
BSONArrayBuilder arr(64 * 1024); // There may be quite a few threads
{
stdx::lock_guard lock(_mutex);
for (set::const_iterator i = _clientConnections.begin();
i != _clientConnections.end();
++i) {
BSONObjBuilder bb(arr.subobjStart());
(*i)->appendInfo(bb);
bb.done();
}
}
b.appendArray("threads", arr.obj());
}
thread_specific_ptr ClientConnections::_perThread;
} // namespace
// The global connection pool
DBConnectionPool shardConnectionPool;
// Different between mongos and mongod
void usingAShardConnection(const string& addr);
ShardConnection::ShardConnection(const ConnectionString& connectionString,
const string& ns,
std::shared_ptr manager)
: _cs(connectionString),
_ns(ns),
_manager(manager) {
_init();
}
ShardConnection::~ShardConnection() {
if (_conn) {
if (_conn->isFailed()) {
if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) {
kill();
}
else {
// The pool takes care of deleting the failed connection - this
// will also trigger disposal of older connections in the pool
done();
}
}
else {
// see done() comments above for why we log this line
log() << "sharded connection to " << _conn->getServerAddress()
<< " not being returned to the pool";
kill();
}
}
}
void ShardConnection::_init() {
invariant(_cs.isValid());
_conn = ClientConnections::threadInstance()->get(_cs.toString(), _ns);
_finishedInit = false;
usingAShardConnection(_cs.toString());
}
void ShardConnection::_finishInit() {
if ( _finishedInit )
return;
_finishedInit = true;
if (versionManager.isVersionableCB(_conn)) {
// Make sure we specified a manager for the correct namespace
if (_ns.size() && _manager)
verify(_manager->getns() == _ns);
_setVersion = versionManager.checkShardVersionCB( this , false , 1 );
}
else {
// Make sure we didn't specify a manager for a non-versionable connection (i.e. config)
verify(!_manager);
_setVersion = false;
}
}
void ShardConnection::done() {
if ( _conn ) {
ClientConnections::threadInstance()->done(_cs.toString(), _conn);
_conn = 0;
_finishedInit = true;
}
}
void ShardConnection::kill() {
if ( _conn ) {
if (versionManager.isVersionableCB(_conn)) {
versionManager.resetShardVersionCB(_conn);
}
if (_conn->isFailed()) {
// Let the pool know about the bad connection and also delegate disposal to it.
ClientConnections::threadInstance()->done(_cs.toString(), _conn);
}
else {
delete _conn;
}
_conn = 0;
_finishedInit = true;
}
}
void ShardConnection::sync() {
ClientConnections::threadInstance()->sync();
}
void ShardConnection::checkMyConnectionVersions( const string & ns ) {
ClientConnections::threadInstance()->checkVersions( ns );
}
void ShardConnection::releaseMyConnections() {
ClientConnections::threadInstance()->releaseAll();
}
void ShardConnection::clearPool() {
shardConnectionPool.clear();
ClientConnections::threadInstance()->clearPool();
}
void ShardConnection::forgetNS( const string& ns ) {
ClientConnections::threadInstance()->forgetNS( ns );
}
bool setShardVersion(DBClientBase& conn,
const string& ns,
const string& configServerPrimary,
ChunkVersion version,
ChunkManager* manager,
bool authoritative,
BSONObj& result) {
BSONObjBuilder cmdBuilder;
cmdBuilder.append("setShardVersion", ns);
cmdBuilder.append("configdb", configServerPrimary);
ShardId shardId;
{
const auto shard = grid.shardRegistry()->getShard(conn.getServerAddress());
shardId = shard->getId();
cmdBuilder.append("shard", shardId);
cmdBuilder.append("shardHost", shard->getConnString().toString());
}
if (ns.size() > 0) {
version.addToBSON(cmdBuilder);
}
else {
cmdBuilder.append("init", true);
}
if (authoritative) {
cmdBuilder.appendBool("authoritative", 1);
}
BSONObj cmd = cmdBuilder.obj();
LOG(1) << " setShardVersion " << shardId << " " << conn.getServerAddress()
<< " " << ns << " " << cmd
<< (manager ? string(str::stream() << " " << manager->getSequenceNumber()) : "");
return conn.runCommand("admin", cmd, result, 0);
}
}