diff options
Diffstat (limited to 'src')
26 files changed, 496 insertions, 410 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index f6073fdac5c..310c30eb41a 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -509,7 +509,7 @@ coredbEnv.Library("coredb", [ "db/pipeline/expression.cpp", "db/projection.cpp", "db/stats/timer_stats.cpp", - "s/shardconnection.cpp", + "s/client/shard_connection.cpp", ], LIBDEPS=['db/auth/serverauth', 'db/commands/server_status_core', diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h index 8a7fbf047da..d202e703751 100644 --- a/src/mongo/client/parallel.h +++ b/src/mongo/client/parallel.h @@ -39,6 +39,7 @@ #include "mongo/client/export_macros.h" #include "mongo/db/matcher/matcher.h" #include "mongo/db/namespace_string.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/shard.h" #include "mongo/util/concurrency/mvar.h" @@ -49,7 +50,7 @@ namespace mongo { /** * holder for a server address and a query to run */ - class MONGO_CLIENT_API ServerAndQuery { + class ServerAndQuery { public: ServerAndQuery( const std::string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) { @@ -84,7 +85,7 @@ namespace mongo { class ParallelConnectionMetadata; class DBClientCursorHolder; - class MONGO_CLIENT_API CommandInfo { + class CommandInfo { public: std::string versionedNS; BSONObj cmdFilter; @@ -101,12 +102,10 @@ namespace mongo { } }; - typedef boost::shared_ptr<ShardConnection> ShardConnectionPtr; - class DBClientCursor; typedef boost::shared_ptr<DBClientCursor> DBClientCursorPtr; - class MONGO_CLIENT_API ParallelConnectionState { + class ParallelConnectionState { public: ParallelConnectionState() : @@ -135,7 +134,7 @@ namespace mongo { typedef ParallelConnectionState PCState; typedef boost::shared_ptr<PCState> PCStatePtr; - class MONGO_CLIENT_API ParallelConnectionMetadata { + class ParallelConnectionMetadata { public: ParallelConnectionMetadata() : @@ -305,7 +304,7 @@ namespace mongo { * TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via * mapreduce is the main issue since it has no metadata and this holder owns the cursors. */ - class MONGO_CLIENT_API DBClientCursorHolder { + class DBClientCursorHolder { public: DBClientCursorHolder() {} @@ -338,7 +337,7 @@ namespace mongo { * right now uses underlying sync network ops and uses another thread * should be changed to use non-blocking io */ - class MONGO_CLIENT_API Future { + class Future { public: class CommandResult { public: diff --git a/src/mongo/db/auth/native_sasl_authentication_session.cpp b/src/mongo/db/auth/native_sasl_authentication_session.cpp index 0b2df6dfadb..af23a24c7e3 100644 --- a/src/mongo/db/auth/native_sasl_authentication_session.cpp +++ b/src/mongo/db/auth/native_sasl_authentication_session.cpp @@ -47,7 +47,6 @@ #include "mongo/db/auth/sasl_options.h" #include "mongo/db/auth/sasl_plain_server_conversation.h" #include "mongo/db/auth/sasl_scramsha1_server_conversation.h" -#include "mongo/db/operation_context_noop.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" diff --git a/src/mongo/db/auth/sasl_authentication_session.cpp b/src/mongo/db/auth/sasl_authentication_session.cpp index d3c428e39c5..62abdc8a284 100644 --- a/src/mongo/db/auth/sasl_authentication_session.cpp +++ b/src/mongo/db/auth/sasl_authentication_session.cpp @@ -42,7 +42,6 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/authz_manager_external_state_mock.h" #include "mongo/db/auth/authz_session_external_state_mock.h" -#include "mongo/db/operation_context_noop.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" diff --git a/src/mongo/db/auth/user_cache_invalidator_job.cpp b/src/mongo/db/auth/user_cache_invalidator_job.cpp index f7b9df312c4..7669d0201bc 100644 --- a/src/mongo/db/auth/user_cache_invalidator_job.cpp +++ b/src/mongo/db/auth/user_cache_invalidator_job.cpp @@ -36,6 +36,7 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" +#include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" diff --git a/src/mongo/db/conn_pool_options.cpp b/src/mongo/db/conn_pool_options.cpp index 0ac2c985920..ae37e1a3155 100644 --- a/src/mongo/db/conn_pool_options.cpp +++ b/src/mongo/db/conn_pool_options.cpp @@ -26,12 +26,14 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/conn_pool_options.h" #include "mongo/base/init.h" -#include "mongo/db/server_parameters.h" #include "mongo/client/connpool.h" -#include "mongo/s/shard.h" +#include "mongo/db/server_parameters.h" +#include "mongo/s/client/shard_connection.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index cb838d55929..0dcfaac1569 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -37,6 +37,7 @@ #include <boost/unordered_map.hpp> #include <deque> +#include "mongo/client/connpool.h" #include "mongo/db/clientcursor.h" #include "mongo/db/jsobj.h" #include "mongo/db/matcher/matcher.h" diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp index 651502d01bc..70e3d24e404 100644 --- a/src/mongo/s/balancer_policy.cpp +++ b/src/mongo/s/balancer_policy.cpp @@ -33,6 +33,7 @@ #include <algorithm> +#include "mongo/client/connpool.h" #include "mongo/s/balancer_policy.h" #include "mongo/s/chunk.h" #include "mongo/s/config.h" diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 6df0afcdbb2..a99a32829e1 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -711,6 +711,10 @@ namespace mongo { to << ChunkType::shard(_shard.getName()); } + string Chunk::genID() const { + return genID(_manager->getns(), _min); + } + string Chunk::genID( const string& ns , const BSONObj& o ) { StringBuilder buf; buf << ns << "-"; @@ -1463,13 +1467,15 @@ namespace mongo { // we need a special command for dropping on the d side // this hack works for the moment - if ( ! setShardVersion( conn.conn(), - _ns, - ChunkVersion( 0, 0, OID() ), - ChunkManagerPtr(), - true, res ) ) - { - throw UserException( 8071 , str::stream() << "cleaning up after drop failed: " << res ); + if (!setShardVersion(conn.conn(), + _ns, + configServer.modelServer(), + ChunkVersion(0, 0, OID()), + NULL, + true, + res)) { + + uasserted(8071, str::stream() << "cleaning up after drop failed: " << res); } conn->simpleCommand( "admin", 0, "unsetSharding" ); @@ -1607,42 +1613,4 @@ namespace mongo { return splitThreshold; } - // NOTE (careful when deprecating) - // currently the sharding is enabled because of a write or read (as opposed to a split or migrate), the shard learns - // its name and through the 'setShardVersion' command call - bool setShardVersion( DBClientBase & conn, - const string& ns, - ChunkVersion version, - ChunkManagerPtr manager, // Used only for reporting! - bool authoritative , - BSONObj& result ) - { - BSONObjBuilder cmdBuilder; - cmdBuilder.append( "setShardVersion" , ns.c_str() ); - cmdBuilder.append( "configdb" , configServer.modelServer() ); - - Shard s = Shard::make(conn.getServerAddress()); - cmdBuilder.append("shard", s.getName()); - cmdBuilder.append("shardHost", s.getConnString()); - - if (ns.size() > 0) { - version.addToBSON(cmdBuilder); - } - else { - cmdBuilder.append("init", true); - } - - if (authoritative) - cmdBuilder.appendBool("authoritative", 1); - - BSONObj cmd = cmdBuilder.obj(); - - LOG(1) << " setShardVersion " << s.getName() << " " << conn.getServerAddress() - << " " << ns << " " << cmd - << (manager.get() ? - string(str::stream() << " " << manager->getSequenceNumber()) : ""); - - return conn.runCommand("admin", cmd, result, 0); - } - } // namespace mongo diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index 46ab7bcf6c2..a8101136160 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -31,7 +31,6 @@ #pragma once #include <boost/next_prior.hpp> -#include <boost/noncopyable.hpp> #include <boost/shared_ptr.hpp> #include "mongo/base/string_data.h" @@ -60,7 +59,7 @@ namespace mongo { typedef std::map<BSONObj,ChunkPtr,BSONObjCmp> ChunkMap; typedef std::map<BSONObj,boost::shared_ptr<ChunkRange>,BSONObjCmp> ChunkRangeMap; - typedef boost::shared_ptr<const ChunkManager> ChunkManagerPtr; + typedef boost::shared_ptr<ChunkManager> ChunkManagerPtr; /** config.chunks @@ -69,7 +68,8 @@ namespace mongo { x is in a shard iff min <= x < max */ - class Chunk : boost::noncopyable { + class Chunk { + MONGO_DISALLOW_COPYING(Chunk); public: enum SplitPointMode { // Determines the split points that will make the current chunk smaller than @@ -565,13 +565,4 @@ namespace mongo { friend class TestableChunkManager; }; - inline std::string Chunk::genID() const { return genID(_manager->getns(), _min); } - - bool setShardVersion( DBClientBase & conn, - const std::string& ns, - ChunkVersion version, - ChunkManagerPtr manager, - bool authoritative, - BSONObj& result ); - } // namespace mongo diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h index aaa115b044b..7b8a7592b15 100644 --- a/src/mongo/s/chunk_manager_targeter.h +++ b/src/mongo/s/chunk_manager_targeter.h @@ -40,8 +40,6 @@ namespace mongo { - class Grid; - struct TargeterStats { // Map of chunk shard minKey -> approximate delta. This is used for deciding // whether a chunk might need splitting or not. diff --git a/src/mongo/s/shardconnection.cpp b/src/mongo/s/client/shard_connection.cpp index 950aa0523a2..7f2ba85a63a 100644 --- a/src/mongo/s/shardconnection.cpp +++ b/src/mongo/s/client/shard_connection.cpp @@ -1,46 +1,42 @@ -// shardconnection.cpp - /** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects -* for all of the code used other than as permitted herein. If you modify -* file(s) with this exception, you may extend this exception to your -* version of the file(s), but you are not obligated to do so. If you do not -* wish to do so, delete this exception statement from your version. If you -* delete this exception statement from all source files in the program, -* then also delete it in the license file. -*/ + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" -#include <boost/noncopyable.hpp> +#include "mongo/s/client/shard_connection.h" + #include <set> -#include "mongo/db/auth/authorization_manager.h" -#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" -#include "mongo/db/server_parameters.h" -#include "mongo/s/config.h" +#include "mongo/s/chunk.h" #include "mongo/s/request.h" #include "mongo/s/shard.h" #include "mongo/s/stale_exception.h" @@ -53,14 +49,13 @@ namespace mongo { using std::auto_ptr; - using std::endl; using std::map; using std::set; using std::string; using std::stringstream; using std::vector; - DBConnectionPool shardConnectionPool; +namespace { class ClientConnections; @@ -68,25 +63,23 @@ namespace mongo { * Class which tracks ClientConnections (the client connection pool) for each incoming * connection, allowing stats access. */ - class ActiveClientConnections { - public: + public: + ActiveClientConnections() : _mutex("ActiveClientConnections") { - ActiveClientConnections() : _mutex( "ActiveClientConnections" ) { } - void add( const ClientConnections* cc ) { - scoped_lock lock( _mutex ); - _clientConnections.insert( cc ); + void add(const ClientConnections* cc) { + scoped_lock lock(_mutex); + _clientConnections.insert(cc); } - void remove( const ClientConnections* cc ) { - scoped_lock lock( _mutex ); - _clientConnections.erase( cc ); + void remove(const ClientConnections* cc) { + scoped_lock lock(_mutex); + _clientConnections.erase(cc); } - // Implemented after ClientConnections - void appendInfo( BSONObjBuilder& b ); + void appendInfo(BSONObjBuilder& b); private: mongo::mutex _mutex; @@ -96,44 +89,53 @@ namespace mongo { /** * Command to allow access to the sharded conn pool information in mongos. - * TODO: Refactor with other connection pooling changes */ class ShardedPoolStats : public Command { public: - ShardedPoolStats() : Command( "shardConnPoolStats" ) {} + ShardedPoolStats() : Command( "shardConnPoolStats" ) { } virtual void help( stringstream &help ) const { help << "stats about the shard connection pool"; } virtual bool isWriteCommandForConfigServer() const { return false; } virtual bool slaveOk() const { return true; } // Same privs as connPoolStats - virtual void addRequiredPrivileges( const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out ) - { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { ActionSet actions; - actions.addAction( ActionType::connPoolStats ); - out->push_back( Privilege( ResourcePattern::forClusterResource(), actions ) ); + actions.addAction(ActionType::connPoolStats); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - virtual bool run( OperationContext* txn, const string&, mongo::BSONObj&, int, std::string&, mongo::BSONObjBuilder& result, bool ) { + virtual bool run(OperationContext* txn, + const string& dbname, + mongo::BSONObj& cmdObj, + int options, + std::string& errmsg, + mongo::BSONObjBuilder& result, + bool fromRepl) { + // Base pool info - shardConnectionPool.appendInfo( result ); + shardConnectionPool.appendInfo(result); + // Thread connection info - activeClientConnections.appendInfo( result ); + activeClientConnections.appendInfo(result); + return true; } } shardedPoolStatsCmd; /** - * holds all the actual db connections for a client to various servers - * 1 per thread, so doesn't have to be thread safe + * holds all the actual db connections for a client to various servers 1 per thread, so + * doesn't have to be thread safe. */ - class ClientConnections : boost::noncopyable { + class ClientConnections { + MONGO_DISALLOW_COPYING(ClientConnections); public: - struct Status : boost::noncopyable { - Status() : created(0), avail(0) {} + + struct Status { + Status() : created(0), avail(0) { } // May be read concurrently, but only written from // this thread. @@ -142,53 +144,64 @@ namespace mongo { }; // Gets or creates the status object for the host - Status* _getStatus( const string& addr ) { - scoped_spinlock lock( _lock ); + Status* _getStatus(const string& addr) { + scoped_spinlock lock(_lock); Status* &temp = _hosts[addr]; - if ( ! temp ) + if (!temp) { temp = new Status(); + } + return temp; } ClientConnections() { // Start tracking client connections - activeClientConnections.add( this ); + activeClientConnections.add(this); } ~ClientConnections() { // Stop tracking these client connections - activeClientConnections.remove( this ); + activeClientConnections.remove(this); - releaseAll( true ); + releaseAll(true); } - void releaseAll( bool fromDestructor = false ) { + void releaseAll(bool fromDestructor = false) { + // Don't need spinlock protection because if not in the destructor, we don't modify + // _hosts, and if in the destructor we are not accessible to external threads. + for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) { + const string addr = i->first; + Status* ss = i->second; + invariant(ss); - // Don't need spinlock protection because if not in the destructor, we don't - // modify _hosts, and if in the destructor we are not accessible to external - // threads. + if (ss->avail) { + // If we're shutting down, don't want to initiate release mechanism as it is + // slow, and isn't needed since all connections will be closed anyway. + if (inShutdown()) { + if (versionManager.isVersionableCB(ss->avail)) { + versionManager.resetShardVersionCB(ss->avail); + } - for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) { - string addr = i->first; - Status* ss = i->second; - verify( ss ); - if ( ss->avail ) { - /* if we're shutting down, don't want to initiate release mechanism as it is slow, - and isn't needed since all connections will be closed anyway */ - if ( inShutdown() ) { - if( versionManager.isVersionableCB( ss->avail ) ) versionManager.resetShardVersionCB( ss->avail ); delete ss->avail; } - else - release( addr , ss->avail ); + else { + release(addr, ss->avail); + } + ss->avail = 0; } - if ( fromDestructor ) delete ss; + + if (fromDestructor) { + delete ss; + } + } + + if (fromDestructor) { + _hosts.clear(); } - if ( fromDestructor ) _hosts.clear(); } - DBClientBase * get( const string& addr , const string& ns ) { + DBClientBase * get(const string& addr, const string& ns) { { // We want to report ns stats @@ -197,17 +210,23 @@ namespace mongo { _seenNS.insert(ns); } - Status* s = _getStatus( addr ); + Status* s = _getStatus(addr); - auto_ptr<DBClientBase> c; // Handles cleanup if there's an exception thrown - if ( s->avail ) { - c.reset( s->avail ); + auto_ptr<DBClientBase> c; + if (s->avail) { + c.reset(s->avail); s->avail = 0; - shardConnectionPool.onHandedOut( c.get() ); // May throw an exception - } else { - c.reset( shardConnectionPool.get( addr ) ); - s->created++; // After, so failed creation doesn't get counted + + // May throw an exception + shardConnectionPool.onHandedOut(c.get()); } + else { + c.reset(shardConnectionPool.get(addr)); + + // After, so failed creation doesn't get counted + s->created++; + } + return c.release(); } @@ -219,7 +238,7 @@ namespace mongo { if (s->avail != NULL) { warning() << "Detected additional sharded connection in the " - "thread local pool for " << addr << endl; + << "thread local pool for " << addr; if (DBException::traceExceptions) { // There shouldn't be more than one connection checked out to the same @@ -289,7 +308,7 @@ namespace mongo { catch ( const DBException& ex ) { warning() << "problem while initially checking shard versions on" << " " - << shard.getName() << causedBy( ex ) << endl; + << shard.getName() << causedBy(ex); // NOTE: This is only a heuristic, to avoid multiple stale version retries // across multiple shards, and does not affect correctness. @@ -367,45 +386,81 @@ namespace mongo { } }; - thread_specific_ptr<ClientConnections> ClientConnections::_perThread; - - /** - * Appends info about all active client shard connections to a BOBuilder - */ - void ActiveClientConnections::appendInfo( BSONObjBuilder& b ) { - BSONArrayBuilder arr( 64 * 1024 ); // There may be quite a few threads + void ActiveClientConnections::appendInfo(BSONObjBuilder& b) { + BSONArrayBuilder arr(64 * 1024); // There may be quite a few threads { - scoped_lock lock( _mutex ); - for ( set<const ClientConnections*>::const_iterator i = _clientConnections.begin(); - i != _clientConnections.end(); ++i ) - { - BSONObjBuilder bb( arr.subobjStart() ); - (*i)->appendInfo( bb ); + scoped_lock lock(_mutex); + for (set<const ClientConnections*>::const_iterator i = _clientConnections.begin(); + i != _clientConnections.end(); + ++i) { + + BSONObjBuilder bb(arr.subobjStart()); + (*i)->appendInfo(bb); bb.done(); } } - b.appendArray( "threads", arr.obj() ); + b.appendArray("threads", arr.obj()); } - ShardConnection::ShardConnection( const Shard * s , const string& ns, ChunkManagerPtr manager ) - : _addr( s->getConnString() ) , _ns( ns ), _manager( manager ) { + thread_specific_ptr<ClientConnections> ClientConnections::_perThread; + +} // namespace + + // The global connection pool + DBConnectionPool shardConnectionPool; + + // Different between mongos and mongod + void usingAShardConnection(const string& addr); + + + ShardConnection::ShardConnection(const Shard * s, const string& ns, ChunkManagerPtr manager) + : _addr(s->getConnString()), + _ns(ns), + _manager(manager) { + _init(); } - ShardConnection::ShardConnection( const Shard& s , const string& ns, ChunkManagerPtr manager ) - : _addr( s.getConnString() ) , _ns( ns ), _manager( manager ) { + ShardConnection::ShardConnection(const Shard& s, const string& ns, ChunkManagerPtr manager) + : _addr(s.getConnString()), + _ns(ns), + _manager( manager ) { + _init(); } - ShardConnection::ShardConnection( const string& addr , const string& ns, ChunkManagerPtr manager ) - : _addr( addr ) , _ns( ns ), _manager( manager ) { + ShardConnection::ShardConnection(const string& addr, const string& ns, ChunkManagerPtr manager) + : _addr(addr), + _ns(ns), + _manager(manager) { + _init(); } - void usingAShardConnection( const string& addr ); + ShardConnection::~ShardConnection() { + if (_conn) { + if (_conn->isFailed()) { + if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) { + kill(); + } + else { + // The pool takes care of deleting the failed connection - this + // will also trigger disposal of older connections in the pool + done(); + } + } + else { + // see done() comments above for why we log this line + log() << "sharded connection to " << _conn->getServerAddress() + << " not being returned to the pool"; + + kill(); + } + } + } void ShardConnection::_init() { verify( _addr.size() ); @@ -465,29 +520,6 @@ namespace mongo { ClientConnections::threadInstance()->checkVersions( ns ); } - ShardConnection::~ShardConnection() { - if ( _conn ) { - if (_conn->isFailed()) { - if (_conn->getSockCreationMicroSec() == - DBClientBase::INVALID_SOCK_CREATION_TIME) { - kill(); - } - else { - // The pool takes care of deleting the failed connection - this - // will also trigger disposal of older connections in the pool - done(); - } - } - else { - /* see done() comments above for why we log this line */ - log() << "sharded connection to " << _conn->getServerAddress() - << " not being returned to the pool" << endl; - - kill(); - } - } - } - void ShardConnection::releaseMyConnections() { ClientConnections::threadInstance()->releaseAll(); } @@ -500,4 +532,41 @@ namespace mongo { void ShardConnection::forgetNS( const string& ns ) { ClientConnections::threadInstance()->forgetNS( ns ); } + + + bool setShardVersion(DBClientBase& conn, + const string& ns, + const string& configServerPrimary, + ChunkVersion version, + ChunkManager* manager, + bool authoritative, + BSONObj& result) { + + BSONObjBuilder cmdBuilder; + cmdBuilder.append("setShardVersion", ns); + cmdBuilder.append("configdb", configServerPrimary); + + Shard s = Shard::make(conn.getServerAddress()); + cmdBuilder.append("shard", s.getName()); + cmdBuilder.append("shardHost", s.getConnString()); + + if (ns.size() > 0) { + version.addToBSON(cmdBuilder); + } + else { + cmdBuilder.append("init", true); + } + + if (authoritative) { + cmdBuilder.appendBool("authoritative", 1); + } + + BSONObj cmd = cmdBuilder.obj(); + + LOG(1) << " setShardVersion " << s.getName() << " " << conn.getServerAddress() + << " " << ns << " " << cmd + << (manager ? string(str::stream() << " " << manager->getSequenceNumber()) : ""); + + return conn.runCommand("admin", cmd, result, 0); + } } diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h new file mode 100644 index 00000000000..1eeed9fd87e --- /dev/null +++ b/src/mongo/s/client/shard_connection.h @@ -0,0 +1,175 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/shared_ptr.hpp> +#include <string> + +#include "mongo/client/connpool.h" +#include "mongo/s/chunk_version.h" + +namespace mongo { + + class ChunkManager; + class Shard; + + typedef boost::shared_ptr<ChunkManager> ChunkManagerPtr; + + class ShardConnection : public AScopedConnection { + public: + ShardConnection(const Shard* s, const std::string& ns, ChunkManagerPtr manager = ChunkManagerPtr()); + ShardConnection(const Shard& s, const std::string& ns, ChunkManagerPtr manager = ChunkManagerPtr()); + ShardConnection(const std::string& addr, const std::string& ns, ChunkManagerPtr manager = ChunkManagerPtr()); + + ~ShardConnection(); + + void done(); + void kill(); + + DBClientBase& conn() { + _finishInit(); + verify( _conn ); + return *_conn; + } + + DBClientBase* operator->() { + _finishInit(); + verify( _conn ); + return _conn; + } + + DBClientBase* get() { + _finishInit(); + verify( _conn ); + return _conn; + } + + /** + * @return the connection object underneath without setting the shard version. + * @throws AssertionException if _conn is uninitialized. + */ + DBClientBase* getRawConn() const { + verify( _conn ); + return _conn; + } + + std::string getHost() const { + return _addr; + } + + std::string getNS() const { + return _ns; + } + + ChunkManagerPtr getManager() const { + return _manager; + } + + bool setVersion() { + _finishInit(); + return _setVersion; + } + + static void sync(); + + void donotCheckVersion() { + _setVersion = false; + _finishedInit = true; + } + + bool ok() const { return _conn != NULL; } + + /** checks all of my thread local connections for the version of this ns */ + static void checkMyConnectionVersions( const std::string & ns ); + + /** + * Returns all the current sharded connections to the pool. + * Note: This is *dangerous* if we have GLE state. + */ + static void releaseMyConnections(); + + /** + * Clears all connections in the sharded pool, including connections in the + * thread local storage pool of the current thread. + */ + static void clearPool(); + + /** + * Forgets a namespace to prevent future versioning. + */ + static void forgetNS( const std::string& ns ); + + private: + void _init(); + void _finishInit(); + + std::string _addr; + std::string _ns; + ChunkManagerPtr _manager; + + bool _finishedInit; + + DBClientBase* _conn; + bool _setVersion; + }; + + + class ShardingConnectionHook : public DBConnectionHook { + public: + + ShardingConnectionHook( bool shardedConnections ) + : _shardedConnections( shardedConnections ) { + + } + + virtual void onCreate( DBClientBase * conn ); + virtual void onDestroy( DBClientBase * conn ); + virtual void onRelease(DBClientBase* conn); + + bool _shardedConnections; + }; + + + /** + * Sends the setShardVersion command on the specified connection. + */ + bool setShardVersion(DBClientBase & conn, + const std::string& ns, + const std::string& configServerPrimary, + ChunkVersion version, + ChunkManager* manager, + bool authoritative, + BSONObj& result); + + + typedef boost::shared_ptr<ShardConnection> ShardConnectionPtr; + + extern DBConnectionPool shardConnectionPool; + +} // namespace mongo diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp index 5f0831d598f..170e43eb77f 100644 --- a/src/mongo/s/commands_admin.cpp +++ b/src/mongo/s/commands_admin.cpp @@ -57,6 +57,7 @@ #include "mongo/db/write_concern_options.h" #include "mongo/s/chunk.h" #include "mongo/s/client_info.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/cluster_write.h" #include "mongo/s/config.h" #include "mongo/s/dbclient_multi_command.h" diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index 4d95f66ff74..d8fa0fdba0a 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -42,6 +42,7 @@ #include "mongo/db/write_concern.h" #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/cluster_write.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp index 994f2b46364..15b2ef6ec9f 100644 --- a/src/mongo/s/d_state.cpp +++ b/src/mongo/s/d_state.cpp @@ -55,10 +55,10 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/client/connpool.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/config.h" #include "mongo/s/d_state.h" #include "mongo/s/metadata_loader.h" -#include "mongo/s/shard.h" #include "mongo/s/stale_exception.h" #include "mongo/util/queue.h" #include "mongo/util/concurrency/mutex.h" diff --git a/src/mongo/s/dbclient_multi_command.cpp b/src/mongo/s/dbclient_multi_command.cpp index b29a2697aff..436a26b0842 100644 --- a/src/mongo/s/dbclient_multi_command.cpp +++ b/src/mongo/s/dbclient_multi_command.cpp @@ -35,6 +35,7 @@ #include "mongo/db/client_basic.h" #include "mongo/db/dbmessage.h" #include "mongo/db/wire_version.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/shard.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/db/server_parameters.h" diff --git a/src/mongo/s/request.cpp b/src/mongo/s/request.cpp index ac2a6781eae..e1f6b322f40 100644 --- a/src/mongo/s/request.cpp +++ b/src/mongo/s/request.cpp @@ -34,35 +34,31 @@ #include "mongo/s/request.h" -#include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" -#include "mongo/db/dbmessage.h" -#include "mongo/db/operation_context_noop.h" #include "mongo/db/stats/counters.h" -#include "mongo/s/chunk.h" #include "mongo/s/client_info.h" -#include "mongo/s/config.h" #include "mongo/s/cursors.h" #include "mongo/s/grid.h" -#include "mongo/s/server.h" +#include "mongo/s/strategy.h" #include "mongo/util/log.h" +#include "mongo/util/timer.h" namespace mongo { using std::endl; using std::string; - Request::Request( Message& m, AbstractMessagingPort* p ) : - _m(m) , _d( m ) , _p(p) , _didInit(false) { + Request::Request(Message& m, AbstractMessagingPort* p) + : _clientInfo(ClientInfo::get()), + _m(m), + _d(m), + _p(p), + _id(_m.header().getId()), + _didInit(false) { - _id = _m.header().getId(); - - _txn.reset(new OperationContextNoop()); - - _clientInfo = ClientInfo::get(); - if ( p ) { - _clientInfo->newPeerRequest( p->remote() ); + if (p) { + _clientInfo->newPeerRequest(p->remote()); } else { _clientInfo->newRequest(); @@ -74,7 +70,7 @@ namespace mongo { return; _didInit = true; reset(); - _clientInfo->getAuthorizationSession()->startRequest(_txn.get()); + _clientInfo->getAuthorizationSession()->startRequest(NULL); } // Deprecated, will move to the strategy itself diff --git a/src/mongo/s/request.h b/src/mongo/s/request.h index e315505febc..66b82e9ecd2 100644 --- a/src/mongo/s/request.h +++ b/src/mongo/s/request.h @@ -30,52 +30,41 @@ #pragma once -#include "mongo/platform/basic.h" - -#include <boost/noncopyable.hpp> -#include <boost/scoped_ptr.hpp> - #include "mongo/db/dbmessage.h" -#include "mongo/s/config.h" #include "mongo/util/net/message.h" namespace mongo { - - class OpCounters; class ClientInfo; - class OperationContext; - - class Request : boost::noncopyable { + class Request { + MONGO_DISALLOW_COPYING(Request); public: - Request( Message& m, AbstractMessagingPort* p ); + Request(Message& m, AbstractMessagingPort* p); - // ---- message info ----- - - - const char * getns() const { + const char* getns() const { return _d.getns(); } + int op() const { return _m.operation(); } + bool expectResponse() const { return op() == dbQuery || op() == dbGetMore; } + bool isCommand() const; MSGID id() const { return _id; } - ClientInfo * getClientInfo() const { + ClientInfo* getClientInfo() const { return _clientInfo; } - // ---- low level access ---- - - void reply( Message & response , const std::string& fromServer ); + void reply(Message & response, const std::string& fromServer); Message& m() { return _m; } DbMessage& d() { return _d; } @@ -88,21 +77,15 @@ namespace mongo { void reset(); private: + ClientInfo* const _clientInfo; + Message& _m; DbMessage _d; - AbstractMessagingPort* _p; + AbstractMessagingPort* const _p; MSGID _id; - ClientInfo * _clientInfo; - - OpCounters* _counter; - - boost::scoped_ptr<OperationContext> _txn; - bool _didInit; }; } - -#include "strategy.h" diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 491ac90eaa6..eda8e400ad8 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -55,7 +55,6 @@ #include "mongo/db/instance.h" #include "mongo/db/lasterror.h" #include "mongo/db/log_process_details.h" -#include "mongo/db/operation_context_noop.h" #include "mongo/db/startup_warnings_common.h" #include "mongo/platform/process_id.h" #include "mongo/s/balance.h" @@ -289,9 +288,7 @@ static ExitCode runMongosServer( bool doUpgrade ) { boost::thread web( stdx::bind(&webServerThread, new NoAdminAccess())); // takes ownership - OperationContextNoop txn; - - Status status = getGlobalAuthorizationManager()->initialize(&txn); + Status status = getGlobalAuthorizationManager()->initialize(NULL); if (!status.isOK()) { mongo::log(LogComponent::kDefault) << "Initializing authorization data failed: " << status; return EXIT_SHARDING_ERROR; diff --git a/src/mongo/s/shard.cpp b/src/mongo/s/shard.cpp index 1eb082b7898..6df1eeb915d 100644 --- a/src/mongo/s/shard.cpp +++ b/src/mongo/s/shard.cpp @@ -49,6 +49,7 @@ #include "mongo/db/auth/privilege.h" #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/client_info.h" #include "mongo/s/config.h" #include "mongo/s/request.h" @@ -357,26 +358,36 @@ namespace mongo { } } cmdGetShardMap; + Shard::Shard() + : _name(""), + _addr(""), + _maxSizeMB(0), + _isDraining(false) { + + } + Shard::Shard(const std::string& name, - const std::string& addr, - long long maxSizeMB, - bool isDraining): - _name(name), - _addr(addr), - _maxSizeMB(maxSizeMB), - _isDraining(isDraining) { + const std::string& addr, + long long maxSizeMB, + bool isDraining) + : _name(name), + _addr(addr), + _maxSizeMB(maxSizeMB), + _isDraining(isDraining) { + _setAddr(addr); } Shard::Shard(const std::string& name, - const ConnectionString& connStr, - long long maxSizeMB, - bool isDraining): - _name(name), - _addr(connStr.toString()), - _cs(connStr), - _maxSizeMB(maxSizeMB), - _isDraining(isDraining) { + const ConnectionString& connStr, + long long maxSizeMB, + bool isDraining) + : _name(name), + _addr(connStr.toString()), + _cs(connStr), + _maxSizeMB(maxSizeMB), + _isDraining(isDraining) { + } Shard Shard::findIfExists( const string& shardName ) { diff --git a/src/mongo/s/shard.h b/src/mongo/s/shard.h index ab49f272da7..a4d73f08ee3 100644 --- a/src/mongo/s/shard.h +++ b/src/mongo/s/shard.h @@ -30,15 +30,14 @@ #pragma once -#include "mongo/platform/basic.h" - #include <boost/shared_ptr.hpp> -#include "mongo/client/connpool.h" +#include "mongo/bson/bsonmisc.h" +#include "mongo/client/dbclientinterface.h" +#include "mongo/util/assert_util.h" namespace mongo { - class ShardConnection; class ShardStatus; /* @@ -47,9 +46,7 @@ namespace mongo { class Shard { public: - Shard() - : _name("") , _addr("") , _maxSizeMB(0) , _isDraining(false) { - } + Shard(); Shard(const std::string& name, const std::string& addr, @@ -196,6 +193,7 @@ namespace mongo { long long _maxSizeMB; // in MBytes, 0 is unlimited bool _isDraining; // shard is currently being removed }; + typedef boost::shared_ptr<Shard> ShardPtr; class ShardStatus { @@ -238,121 +236,4 @@ namespace mongo { std::string _mongoVersion; }; - class ChunkManager; - typedef boost::shared_ptr<const ChunkManager> ChunkManagerPtr; - - class ShardConnection : public AScopedConnection { - public: - ShardConnection( const Shard * s , const std::string& ns, ChunkManagerPtr manager = ChunkManagerPtr() ); - ShardConnection( const Shard& s , const std::string& ns, ChunkManagerPtr manager = ChunkManagerPtr() ); - ShardConnection( const std::string& addr , const std::string& ns, ChunkManagerPtr manager = ChunkManagerPtr() ); - - ~ShardConnection(); - - void done(); - void kill(); - - DBClientBase& conn() { - _finishInit(); - verify( _conn ); - return *_conn; - } - - DBClientBase* operator->() { - _finishInit(); - verify( _conn ); - return _conn; - } - - DBClientBase* get() { - _finishInit(); - verify( _conn ); - return _conn; - } - - /** - * @return the connection object underneath without setting the shard version. - * @throws AssertionException if _conn is uninitialized. - */ - DBClientBase* getRawConn() const { - verify( _conn ); - return _conn; - } - - std::string getHost() const { - return _addr; - } - - std::string getNS() const { - return _ns; - } - - ChunkManagerPtr getManager() const { - return _manager; - } - - bool setVersion() { - _finishInit(); - return _setVersion; - } - - static void sync(); - - void donotCheckVersion() { - _setVersion = false; - _finishedInit = true; - } - - bool ok() const { return _conn != NULL; } - - /** checks all of my thread local connections for the version of this ns */ - static void checkMyConnectionVersions( const std::string & ns ); - - /** - * Returns all the current sharded connections to the pool. - * Note: This is *dangerous* if we have GLE state. - */ - static void releaseMyConnections(); - - /** - * Clears all connections in the sharded pool, including connections in the - * thread local storage pool of the current thread. - */ - static void clearPool(); - - /** - * Forgets a namespace to prevent future versioning. - */ - static void forgetNS( const std::string& ns ); - - private: - void _init(); - void _finishInit(); - - bool _finishedInit; - - std::string _addr; - std::string _ns; - ChunkManagerPtr _manager; - - DBClientBase* _conn; - bool _setVersion; - }; - - - extern DBConnectionPool shardConnectionPool; - - class ShardingConnectionHook : public DBConnectionHook { - public: - - ShardingConnectionHook( bool shardedConnections ) - : _shardedConnections( shardedConnections ) { - } - - virtual void onCreate( DBClientBase * conn ); - virtual void onDestroy( DBClientBase * conn ); - virtual void onRelease(DBClientBase* conn); - - bool _shardedConnections; - }; } diff --git a/src/mongo/s/shard_conn_test.cpp b/src/mongo/s/shard_conn_test.cpp index a858d68caa4..696a5b46b2b 100644 --- a/src/mongo/s/shard_conn_test.cpp +++ b/src/mongo/s/shard_conn_test.cpp @@ -34,7 +34,7 @@ #include "mongo/dbtests/mock/mock_conn_registry.h" #include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/platform/cstdint.h" -#include "mongo/s/shard.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/unittest/unittest.h" #include <vector> diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 19cbc5d11d0..2cb49dc4e3a 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -30,6 +30,8 @@ #include "mongo/platform/basic.h" +#include "mongo/s/strategy.h" + #include <boost/scoped_ptr.hpp> #include "mongo/base/status.h" diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index 7e8d34091e8..f71b51bc89f 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -36,9 +36,9 @@ #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" -#include "mongo/s/shard.h" #include "mongo/s/stale_exception.h" // for SendStaleConfigException #include "mongo/util/log.h" @@ -187,7 +187,13 @@ namespace mongo { LOG(1) << "initializing shard connection to " << shard.toString() << endl; - ok = setShardVersion(*conn, "", ChunkVersion(), ChunkManagerPtr(), true, result); + ok = setShardVersion(*conn, + "", + configServer.modelServer(), + ChunkVersion(), + NULL, + true, + result); } catch( const DBException& ) { @@ -341,9 +347,15 @@ namespace mongo { << ", current chunk manager iteration is " << officialSequenceNumber; BSONObj result; - if ( setShardVersion( *conn , ns , version , manager , authoritative , result ) ) { - // success! - LOG(1) << " setShardVersion success: " << result << endl; + if (setShardVersion(*conn, + ns, + configServer.modelServer(), + version, + manager.get(), + authoritative, + result)) { + + LOG(1) << " setShardVersion success: " << result; connectionShardStatus.setSequence( conn , ns , officialSequenceNumber ); return true; } diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp index 9af842f3e6e..027df17a42b 100644 --- a/src/mongo/shell/bench.cpp +++ b/src/mongo/shell/bench.cpp @@ -42,7 +42,6 @@ #include <iostream> #include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context_noop.h" #include "mongo/client/dbclientcursor.h" #include "mongo/scripting/bson_template_evaluator.h" #include "mongo/scripting/engine.h" @@ -377,8 +376,7 @@ namespace mongo { bool check = ! e["check"].eoo(); if( check ){ if ( e["check"].type() == CodeWScope || e["check"].type() == Code || e["check"].type() == String ) { - OperationContextNoop txn; - scope = globalScriptEngine->getPooledScope(&txn, ns, "benchrun"); + scope = globalScriptEngine->getPooledScope(NULL, ns, "benchrun"); verify( scope.get() ); if ( e.type() == CodeWScope ) { |