summaryrefslogtreecommitdiff
path: root/src/mongo/s/client/shard_connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/client/shard_connection.cpp')
-rw-r--r--src/mongo/s/client/shard_connection.cpp771
1 files changed, 379 insertions, 392 deletions
diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp
index 226bea4fc2c..4607a14baae 100644
--- a/src/mongo/s/client/shard_connection.cpp
+++ b/src/mongo/s/client/shard_connection.cpp
@@ -49,514 +49,501 @@
namespace mongo {
- using std::unique_ptr;
- using std::map;
- using std::set;
- using std::string;
- using std::stringstream;
- using std::vector;
+using std::unique_ptr;
+using std::map;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
namespace {
- class ClientConnections;
+class ClientConnections;
- /**
- * Class which tracks ClientConnections (the client connection pool) for each incoming
- * connection, allowing stats access.
- */
- class ActiveClientConnections {
- public:
- void add(const ClientConnections* cc) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _clientConnections.insert(cc);
- }
+/**
+ * Class which tracks ClientConnections (the client connection pool) for each incoming
+ * connection, allowing stats access.
+ */
+class ActiveClientConnections {
+public:
+ void add(const ClientConnections* cc) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _clientConnections.insert(cc);
+ }
- void remove(const ClientConnections* cc) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _clientConnections.erase(cc);
- }
+ void remove(const ClientConnections* cc) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _clientConnections.erase(cc);
+ }
- void appendInfo(BSONObjBuilder& b);
+ void appendInfo(BSONObjBuilder& b);
- private:
- stdx::mutex _mutex;
- set<const ClientConnections*> _clientConnections;
+private:
+ stdx::mutex _mutex;
+ set<const ClientConnections*> _clientConnections;
- } activeClientConnections;
+} activeClientConnections;
- /**
- * Command to allow access to the sharded conn pool information in mongos.
- */
- class ShardedPoolStats : public Command {
- public:
-
- ShardedPoolStats() : Command( "shardConnPoolStats" ) { }
- virtual void help( stringstream &help ) const { help << "stats about the shard connection pool"; }
- virtual bool isWriteCommandForConfigServer() const { return false; }
- virtual bool slaveOk() const { return true; }
-
- // Same privs as connPoolStats
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::connPoolStats);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
-
- virtual bool run(OperationContext* txn,
- const string& dbname,
- mongo::BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- mongo::BSONObjBuilder& result) {
+/**
+ * Command to allow access to the sharded conn pool information in mongos.
+ */
+class ShardedPoolStats : public Command {
+public:
+ ShardedPoolStats() : Command("shardConnPoolStats") {}
+ virtual void help(stringstream& help) const {
+ help << "stats about the shard connection pool";
+ }
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
+ virtual bool slaveOk() const {
+ return true;
+ }
- // Base pool info
- shardConnectionPool.appendInfo(result);
+ // Same privs as connPoolStats
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::connPoolStats);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
- // Thread connection info
- activeClientConnections.appendInfo(result);
+ virtual bool run(OperationContext* txn,
+ const string& dbname,
+ mongo::BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ mongo::BSONObjBuilder& result) {
+ // Base pool info
+ shardConnectionPool.appendInfo(result);
- return true;
- }
+ // Thread connection info
+ activeClientConnections.appendInfo(result);
- } shardedPoolStatsCmd;
+ return true;
+ }
- /**
- * holds all the actual db connections for a client to various servers 1 per thread, so
- * doesn't have to be thread safe.
- */
- class ClientConnections {
- MONGO_DISALLOW_COPYING(ClientConnections);
- public:
+} shardedPoolStatsCmd;
- struct Status {
- Status() : created(0), avail(0) { }
+/**
+ * holds all the actual db connections for a client to various servers 1 per thread, so
+ * doesn't have to be thread safe.
+ */
+class ClientConnections {
+ MONGO_DISALLOW_COPYING(ClientConnections);
- // May be read concurrently, but only written from
- // this thread.
- long long created;
- DBClientBase* avail;
- };
+public:
+ struct Status {
+ Status() : created(0), avail(0) {}
- // Gets or creates the status object for the host
- Status* _getStatus(const string& addr) {
- scoped_spinlock lock(_lock);
- Status* &temp = _hosts[addr];
- if (!temp) {
- temp = new Status();
- }
+ // May be read concurrently, but only written from
+ // this thread.
+ long long created;
+ DBClientBase* avail;
+ };
- return temp;
+ // Gets or creates the status object for the host
+ Status* _getStatus(const string& addr) {
+ scoped_spinlock lock(_lock);
+ Status*& temp = _hosts[addr];
+ if (!temp) {
+ temp = new Status();
}
- ClientConnections() {
- // Start tracking client connections
- activeClientConnections.add(this);
- }
+ return temp;
+ }
- ~ClientConnections() {
- // Stop tracking these client connections
- activeClientConnections.remove(this);
+ ClientConnections() {
+ // Start tracking client connections
+ activeClientConnections.add(this);
+ }
- releaseAll(true);
- }
+ ~ClientConnections() {
+ // Stop tracking these client connections
+ activeClientConnections.remove(this);
- void releaseAll(bool fromDestructor = false) {
- // Don't need spinlock protection because if not in the destructor, we don't modify
- // _hosts, and if in the destructor we are not accessible to external threads.
- for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
- const string addr = i->first;
- Status* ss = i->second;
- invariant(ss);
-
- if (ss->avail) {
- // If we're shutting down, don't want to initiate release mechanism as it is
- // slow, and isn't needed since all connections will be closed anyway.
- if (inShutdown()) {
- if (versionManager.isVersionableCB(ss->avail)) {
- versionManager.resetShardVersionCB(ss->avail);
- }
-
- delete ss->avail;
- }
- else {
- release(addr, ss->avail);
+ releaseAll(true);
+ }
+
+ void releaseAll(bool fromDestructor = false) {
+ // Don't need spinlock protection because if not in the destructor, we don't modify
+ // _hosts, and if in the destructor we are not accessible to external threads.
+ for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
+ const string addr = i->first;
+ Status* ss = i->second;
+ invariant(ss);
+
+ if (ss->avail) {
+ // If we're shutting down, don't want to initiate release mechanism as it is
+ // slow, and isn't needed since all connections will be closed anyway.
+ if (inShutdown()) {
+ if (versionManager.isVersionableCB(ss->avail)) {
+ versionManager.resetShardVersionCB(ss->avail);
}
- ss->avail = 0;
+ delete ss->avail;
+ } else {
+ release(addr, ss->avail);
}
- if (fromDestructor) {
- delete ss;
- }
+ ss->avail = 0;
}
if (fromDestructor) {
- _hosts.clear();
+ delete ss;
}
}
- DBClientBase * get(const string& addr, const string& ns) {
-
- {
- // We want to report ns stats
- scoped_spinlock lock(_lock);
- if (ns.size() > 0)
- _seenNS.insert(ns);
- }
+ if (fromDestructor) {
+ _hosts.clear();
+ }
+ }
- Status* s = _getStatus(addr);
+ DBClientBase* get(const string& addr, const string& ns) {
+ {
+ // We want to report ns stats
+ scoped_spinlock lock(_lock);
+ if (ns.size() > 0)
+ _seenNS.insert(ns);
+ }
- unique_ptr<DBClientBase> c;
- if (s->avail) {
- c.reset(s->avail);
- s->avail = 0;
+ Status* s = _getStatus(addr);
- // May throw an exception
- shardConnectionPool.onHandedOut(c.get());
- }
- else {
- c.reset(shardConnectionPool.get(addr));
+ unique_ptr<DBClientBase> c;
+ if (s->avail) {
+ c.reset(s->avail);
+ s->avail = 0;
- // After, so failed creation doesn't get counted
- s->created++;
- }
+ // May throw an exception
+ shardConnectionPool.onHandedOut(c.get());
+ } else {
+ c.reset(shardConnectionPool.get(addr));
- return c.release();
+ // After, so failed creation doesn't get counted
+ s->created++;
}
- void done( const string& addr , DBClientBase* conn ) {
- Status* s = _hosts[addr];
- verify( s );
-
- const bool isConnGood = shardConnectionPool.isConnectionGood(addr, conn);
+ return c.release();
+ }
- if (s->avail != NULL) {
- warning() << "Detected additional sharded connection in the "
- << "thread local pool for " << addr;
+ void done(const string& addr, DBClientBase* conn) {
+ Status* s = _hosts[addr];
+ verify(s);
- if (DBException::traceExceptions) {
- // There shouldn't be more than one connection checked out to the same
- // host on the same thread.
- printStackTrace();
- }
+ const bool isConnGood = shardConnectionPool.isConnectionGood(addr, conn);
- if (!isConnGood) {
- delete s->avail;
- s->avail = NULL;
- }
+ if (s->avail != NULL) {
+ warning() << "Detected additional sharded connection in the "
+ << "thread local pool for " << addr;
- // Let the internal pool handle the bad connection, this can also
- // update the lower bounds for the known good socket creation time
- // for this host.
- release(addr, conn);
- return;
+ if (DBException::traceExceptions) {
+ // There shouldn't be more than one connection checked out to the same
+ // host on the same thread.
+ printStackTrace();
}
if (!isConnGood) {
- // Let the internal pool handle the bad connection.
- release(addr, conn);
- return;
+ delete s->avail;
+ s->avail = NULL;
}
- // Note: Although we try our best to clear bad connections as much as possible,
- // some of them can still slip through because of how ClientConnections are being
- // used - as thread local variables. This means that threads won't be able to
- // see the s->avail connection of other threads.
-
- s->avail = conn;
+ // Let the internal pool handle the bad connection, this can also
+ // update the lower bounds for the known good socket creation time
+ // for this host.
+ release(addr, conn);
+ return;
}
- void sync() {
- for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) {
- string addr = i->first;
- Status* ss = i->second;
- if ( ss->avail )
- ss->avail->getLastError();
- }
+ if (!isConnGood) {
+ // Let the internal pool handle the bad connection.
+ release(addr, conn);
+ return;
}
- void checkVersions( const string& ns ) {
-
- vector<ShardId> all;
- grid.shardRegistry()->getAllShardIds(&all);
+ // Note: Although we try our best to clear bad connections as much as possible,
+ // some of them can still slip through because of how ClientConnections are being
+ // used - as thread local variables. This means that threads won't be able to
+ // see the s->avail connection of other threads.
- // Don't report exceptions here as errors in GetLastError
- LastError::Disabled ignoreForGLE(&LastError::get(cc()));
+ s->avail = conn;
+ }
- // Now only check top-level shard connections
- for (const ShardId& shardId : all) {
- try {
- const auto shard = grid.shardRegistry()->getShard(shardId);
- if (!shard) {
- continue;
- }
+ void sync() {
+ for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
+ string addr = i->first;
+ Status* ss = i->second;
+ if (ss->avail)
+ ss->avail->getLastError();
+ }
+ }
- string sconnString = shard->getConnString().toString();
- Status* s = _getStatus( sconnString );
+ void checkVersions(const string& ns) {
+ vector<ShardId> all;
+ grid.shardRegistry()->getAllShardIds(&all);
- if( ! s->avail ) {
- s->avail = shardConnectionPool.get( sconnString );
- s->created++; // After, so failed creation doesn't get counted
- }
+ // Don't report exceptions here as errors in GetLastError
+ LastError::Disabled ignoreForGLE(&LastError::get(cc()));
- versionManager.checkShardVersionCB( s->avail, ns, false, 1 );
+ // Now only check top-level shard connections
+ for (const ShardId& shardId : all) {
+ try {
+ const auto shard = grid.shardRegistry()->getShard(shardId);
+ if (!shard) {
+ continue;
}
- catch ( const DBException& ex ) {
- warning() << "problem while initially checking shard versions on" << " "
- << shardId << causedBy(ex);
+ string sconnString = shard->getConnString().toString();
+ Status* s = _getStatus(sconnString);
- // NOTE: This is only a heuristic, to avoid multiple stale version retries
- // across multiple shards, and does not affect correctness.
+ if (!s->avail) {
+ s->avail = shardConnectionPool.get(sconnString);
+ s->created++; // After, so failed creation doesn't get counted
}
+
+ versionManager.checkShardVersionCB(s->avail, ns, false, 1);
+ } catch (const DBException& ex) {
+ warning() << "problem while initially checking shard versions on"
+ << " " << shardId << causedBy(ex);
+
+ // NOTE: This is only a heuristic, to avoid multiple stale version retries
+ // across multiple shards, and does not affect correctness.
}
}
+ }
- void release( const string& addr , DBClientBase * conn ) {
- shardConnectionPool.release( addr , conn );
+ void release(const string& addr, DBClientBase* conn) {
+ shardConnectionPool.release(addr, conn);
+ }
+
+ /**
+ * Appends info about the client connection pool to a BOBuilder
+ * Safe to call with activeClientConnections lock
+ */
+ void appendInfo(BSONObjBuilder& b) const {
+ scoped_spinlock lock(_lock);
+
+ BSONArrayBuilder hostsArrB(b.subarrayStart("hosts"));
+ for (HostMap::const_iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
+ BSONObjBuilder bb(hostsArrB.subobjStart());
+ bb.append("host", i->first);
+ bb.append("created", i->second->created);
+ bb.appendBool("avail", static_cast<bool>(i->second->avail));
+ bb.done();
}
+ hostsArrB.done();
- /**
- * Appends info about the client connection pool to a BOBuilder
- * Safe to call with activeClientConnections lock
- */
- void appendInfo( BSONObjBuilder& b ) const {
-
- scoped_spinlock lock( _lock );
-
- BSONArrayBuilder hostsArrB( b.subarrayStart( "hosts" ) );
- for ( HostMap::const_iterator i = _hosts.begin(); i != _hosts.end(); ++i ) {
- BSONObjBuilder bb( hostsArrB.subobjStart() );
- bb.append( "host", i->first );
- bb.append( "created", i->second->created );
- bb.appendBool( "avail", static_cast<bool>( i->second->avail ) );
- bb.done();
- }
- hostsArrB.done();
+ BSONArrayBuilder nsArrB(b.subarrayStart("seenNS"));
+ for (set<string>::const_iterator i = _seenNS.begin(); i != _seenNS.end(); ++i) {
+ nsArrB.append(*i);
+ }
+ nsArrB.done();
+ }
- BSONArrayBuilder nsArrB( b.subarrayStart( "seenNS" ) );
- for ( set<string>::const_iterator i = _seenNS.begin(); i != _seenNS.end(); ++i ) {
- nsArrB.append(*i);
+ // Protects only the creation of new entries in the _hosts and _seenNS map
+ // from external threads. Reading _hosts / _seenNS in this thread doesn't
+ // need protection.
+ mutable SpinLock _lock;
+ typedef map<string, Status*, DBConnectionPool::serverNameCompare> HostMap;
+ HostMap _hosts;
+ set<string> _seenNS;
+
+ /**
+ * Clears the connections kept by this pool (ie, not including the global pool)
+ */
+ void clearPool() {
+ for (HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) {
+ if (iter->second->avail != NULL) {
+ delete iter->second->avail;
}
- nsArrB.done();
+ delete iter->second;
}
-
- // Protects only the creation of new entries in the _hosts and _seenNS map
- // from external threads. Reading _hosts / _seenNS in this thread doesn't
- // need protection.
- mutable SpinLock _lock;
- typedef map<string,Status*,DBConnectionPool::serverNameCompare> HostMap;
- HostMap _hosts;
- set<string> _seenNS;
-
- /**
- * Clears the connections kept by this pool (ie, not including the global pool)
- */
- void clearPool() {
- for(HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) {
- if (iter->second->avail != NULL) {
- delete iter->second->avail;
- }
- delete iter->second;
- }
- _hosts.clear();
- }
+ _hosts.clear();
+ }
- void forgetNS( const string& ns ) {
- scoped_spinlock lock( _lock );
- _seenNS.erase( ns );
- }
+ void forgetNS(const string& ns) {
+ scoped_spinlock lock(_lock);
+ _seenNS.erase(ns);
+ }
- // -----
+ // -----
- static thread_specific_ptr<ClientConnections> _perThread;
+ static thread_specific_ptr<ClientConnections> _perThread;
- static ClientConnections* threadInstance() {
- ClientConnections* cc = _perThread.get();
- if ( ! cc ) {
- cc = new ClientConnections();
- _perThread.reset( cc );
- }
- return cc;
+ static ClientConnections* threadInstance() {
+ ClientConnections* cc = _perThread.get();
+ if (!cc) {
+ cc = new ClientConnections();
+ _perThread.reset(cc);
}
- };
+ return cc;
+ }
+};
- void ActiveClientConnections::appendInfo(BSONObjBuilder& b) {
- BSONArrayBuilder arr(64 * 1024); // There may be quite a few threads
+void ActiveClientConnections::appendInfo(BSONObjBuilder& b) {
+ BSONArrayBuilder arr(64 * 1024); // There may be quite a few threads
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- for (set<const ClientConnections*>::const_iterator i = _clientConnections.begin();
- i != _clientConnections.end();
- ++i) {
-
- BSONObjBuilder bb(arr.subobjStart());
- (*i)->appendInfo(bb);
- bb.done();
- }
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ for (set<const ClientConnections*>::const_iterator i = _clientConnections.begin();
+ i != _clientConnections.end();
+ ++i) {
+ BSONObjBuilder bb(arr.subobjStart());
+ (*i)->appendInfo(bb);
+ bb.done();
}
-
- b.appendArray("threads", arr.obj());
}
- thread_specific_ptr<ClientConnections> ClientConnections::_perThread;
-
-} // namespace
+ b.appendArray("threads", arr.obj());
+}
- // The global connection pool
- DBConnectionPool shardConnectionPool;
+thread_specific_ptr<ClientConnections> ClientConnections::_perThread;
- // Different between mongos and mongod
- void usingAShardConnection(const string& addr);
+} // namespace
+// The global connection pool
+DBConnectionPool shardConnectionPool;
- ShardConnection::ShardConnection(const ConnectionString& connectionString,
- const string& ns,
- std::shared_ptr<ChunkManager> manager)
- : _cs(connectionString),
- _ns(ns),
- _manager(manager) {
+// Different between mongos and mongod
+void usingAShardConnection(const string& addr);
- _init();
- }
- ShardConnection::~ShardConnection() {
- if (_conn) {
- if (_conn->isFailed()) {
- if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) {
- kill();
- }
- else {
- // The pool takes care of deleting the failed connection - this
- // will also trigger disposal of older connections in the pool
- done();
- }
- }
- else {
- // see done() comments above for why we log this line
- log() << "sharded connection to " << _conn->getServerAddress()
- << " not being returned to the pool";
+ShardConnection::ShardConnection(const ConnectionString& connectionString,
+ const string& ns,
+ std::shared_ptr<ChunkManager> manager)
+ : _cs(connectionString), _ns(ns), _manager(manager) {
+ _init();
+}
+ShardConnection::~ShardConnection() {
+ if (_conn) {
+ if (_conn->isFailed()) {
+ if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) {
kill();
+ } else {
+ // The pool takes care of deleting the failed connection - this
+ // will also trigger disposal of older connections in the pool
+ done();
}
+ } else {
+ // see done() comments above for why we log this line
+ log() << "sharded connection to " << _conn->getServerAddress()
+ << " not being returned to the pool";
+
+ kill();
}
}
+}
+
+void ShardConnection::_init() {
+ invariant(_cs.isValid());
+ _conn = ClientConnections::threadInstance()->get(_cs.toString(), _ns);
+ _finishedInit = false;
+ usingAShardConnection(_cs.toString());
+}
- void ShardConnection::_init() {
- invariant(_cs.isValid());
- _conn = ClientConnections::threadInstance()->get(_cs.toString(), _ns);
- _finishedInit = false;
- usingAShardConnection(_cs.toString());
+void ShardConnection::_finishInit() {
+ if (_finishedInit)
+ return;
+ _finishedInit = true;
+
+ if (versionManager.isVersionableCB(_conn)) {
+ // Make sure we specified a manager for the correct namespace
+ if (_ns.size() && _manager)
+ verify(_manager->getns() == _ns);
+ _setVersion = versionManager.checkShardVersionCB(this, false, 1);
+ } else {
+ // Make sure we didn't specify a manager for a non-versionable connection (i.e. config)
+ verify(!_manager);
+ _setVersion = false;
}
+}
- void ShardConnection::_finishInit() {
- if ( _finishedInit )
- return;
+void ShardConnection::done() {
+ if (_conn) {
+ ClientConnections::threadInstance()->done(_cs.toString(), _conn);
+ _conn = 0;
_finishedInit = true;
+ }
+}
+void ShardConnection::kill() {
+ if (_conn) {
if (versionManager.isVersionableCB(_conn)) {
- // Make sure we specified a manager for the correct namespace
- if (_ns.size() && _manager)
- verify(_manager->getns() == _ns);
- _setVersion = versionManager.checkShardVersionCB( this , false , 1 );
- }
- else {
- // Make sure we didn't specify a manager for a non-versionable connection (i.e. config)
- verify(!_manager);
- _setVersion = false;
+ versionManager.resetShardVersionCB(_conn);
}
- }
- void ShardConnection::done() {
- if ( _conn ) {
+ if (_conn->isFailed()) {
+ // Let the pool know about the bad connection and also delegate disposal to it.
ClientConnections::threadInstance()->done(_cs.toString(), _conn);
- _conn = 0;
- _finishedInit = true;
+ } else {
+ delete _conn;
}
+
+ _conn = 0;
+ _finishedInit = true;
}
+}
- void ShardConnection::kill() {
- if ( _conn ) {
- if (versionManager.isVersionableCB(_conn)) {
- versionManager.resetShardVersionCB(_conn);
- }
+void ShardConnection::sync() {
+ ClientConnections::threadInstance()->sync();
+}
- if (_conn->isFailed()) {
- // Let the pool know about the bad connection and also delegate disposal to it.
- ClientConnections::threadInstance()->done(_cs.toString(), _conn);
- }
- else {
- delete _conn;
- }
+void ShardConnection::checkMyConnectionVersions(const string& ns) {
+ ClientConnections::threadInstance()->checkVersions(ns);
+}
- _conn = 0;
- _finishedInit = true;
- }
- }
+void ShardConnection::releaseMyConnections() {
+ ClientConnections::threadInstance()->releaseAll();
+}
- void ShardConnection::sync() {
- ClientConnections::threadInstance()->sync();
- }
+void ShardConnection::clearPool() {
+ shardConnectionPool.clear();
+ ClientConnections::threadInstance()->clearPool();
+}
- void ShardConnection::checkMyConnectionVersions( const string & ns ) {
- ClientConnections::threadInstance()->checkVersions( ns );
- }
+void ShardConnection::forgetNS(const string& ns) {
+ ClientConnections::threadInstance()->forgetNS(ns);
+}
- void ShardConnection::releaseMyConnections() {
- ClientConnections::threadInstance()->releaseAll();
- }
- void ShardConnection::clearPool() {
- shardConnectionPool.clear();
- ClientConnections::threadInstance()->clearPool();
+bool setShardVersion(DBClientBase& conn,
+ const string& ns,
+ const string& configServerPrimary,
+ ChunkVersion version,
+ ChunkManager* manager,
+ bool authoritative,
+ BSONObj& result) {
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.append("setShardVersion", ns);
+ cmdBuilder.append("configdb", configServerPrimary);
+
+ ShardId shardId;
+ {
+ const auto shard = grid.shardRegistry()->getShard(conn.getServerAddress());
+ shardId = shard->getId();
+ cmdBuilder.append("shard", shardId);
+ cmdBuilder.append("shardHost", shard->getConnString().toString());
}
- void ShardConnection::forgetNS( const string& ns ) {
- ClientConnections::threadInstance()->forgetNS( ns );
+ if (ns.size() > 0) {
+ version.addToBSON(cmdBuilder);
+ } else {
+ cmdBuilder.append("init", true);
}
+ if (authoritative) {
+ cmdBuilder.appendBool("authoritative", 1);
+ }
- bool setShardVersion(DBClientBase& conn,
- const string& ns,
- const string& configServerPrimary,
- ChunkVersion version,
- ChunkManager* manager,
- bool authoritative,
- BSONObj& result) {
-
- BSONObjBuilder cmdBuilder;
- cmdBuilder.append("setShardVersion", ns);
- cmdBuilder.append("configdb", configServerPrimary);
-
- ShardId shardId;
- {
- const auto shard = grid.shardRegistry()->getShard(conn.getServerAddress());
- shardId = shard->getId();
- cmdBuilder.append("shard", shardId);
- cmdBuilder.append("shardHost", shard->getConnString().toString());
- }
-
- if (ns.size() > 0) {
- version.addToBSON(cmdBuilder);
- }
- else {
- cmdBuilder.append("init", true);
- }
-
- if (authoritative) {
- cmdBuilder.appendBool("authoritative", 1);
- }
-
- BSONObj cmd = cmdBuilder.obj();
+ BSONObj cmd = cmdBuilder.obj();
- LOG(1) << " setShardVersion " << shardId << " " << conn.getServerAddress()
- << " " << ns << " " << cmd
- << (manager ? string(str::stream() << " " << manager->getSequenceNumber()) : "");
+ LOG(1) << " setShardVersion " << shardId << " " << conn.getServerAddress() << " " << ns
+ << " " << cmd
+ << (manager ? string(str::stream() << " " << manager->getSequenceNumber()) : "");
- return conn.runCommand("admin", cmd, result, 0);
- }
+ return conn.runCommand("admin", cmd, result, 0);
+}
}