diff options
Diffstat (limited to 'client')
33 files changed, 1100 insertions, 1087 deletions
diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp index c744c862e81..0c1d7603715 100644 --- a/client/clientOnly.cpp +++ b/client/clientOnly.cpp @@ -37,12 +37,12 @@ namespace mongo { out() << "exiting" << endl; ::exit( returnCode ); } - - bool inShutdown(){ + + bool inShutdown() { return dbexitCalled; } - void setupSignals(){ + void setupSignals() { // maybe should do SIGPIPE here, not sure } @@ -50,20 +50,20 @@ namespace mongo { return "in client only mode"; } - bool haveLocalShardingInfo( const string& ns ){ + bool haveLocalShardingInfo( const string& ns ) { return false; } - DBClientBase * createDirectClient(){ + DBClientBase * createDirectClient() { uassert( 10256 , "no createDirectClient in clientOnly" , 0 ); return 0; } - void Shard::getAllShards( vector<Shard>& all ){ + void Shard::getAllShards( vector<Shard>& all ) { assert(0); } - bool Shard::isAShard( const string& ident ){ + bool Shard::isAShard( const string& ident ) { assert(0); return false; } diff --git a/client/connpool.cpp b/client/connpool.cpp index 25098190e85..dce6b852759 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -27,9 +27,9 @@ namespace mongo { // ------ PoolForHost ------ - - PoolForHost::~PoolForHost(){ - while ( ! _pool.empty() ){ + + PoolForHost::~PoolForHost() { + while ( ! _pool.empty() ) { StoredConnection sc = _pool.top(); delete sc.conn; _pool.pop(); @@ -39,64 +39,64 @@ namespace mongo { void PoolForHost::done( DBClientBase * c ) { _pool.push(c); } - + DBClientBase * PoolForHost::get() { - + time_t now = time(0); - - while ( ! _pool.empty() ){ + + while ( ! _pool.empty() ) { StoredConnection sc = _pool.top(); _pool.pop(); if ( sc.ok( now ) ) return sc.conn; delete sc.conn; } - + return NULL; } - + void PoolForHost::flush() { vector<StoredConnection> all; - while ( ! _pool.empty() ){ + while ( ! _pool.empty() ) { StoredConnection c = _pool.top(); _pool.pop(); all.push_back( c ); bool res; c.conn->isMaster( res ); } - - for ( vector<StoredConnection>::iterator i=all.begin(); i != all.end(); ++i ){ + + for ( vector<StoredConnection>::iterator i=all.begin(); i != all.end(); ++i ) { _pool.push( *i ); } } - PoolForHost::StoredConnection::StoredConnection( DBClientBase * c ){ + PoolForHost::StoredConnection::StoredConnection( DBClientBase * c ) { conn = c; when = time(0); } - bool PoolForHost::StoredConnection::ok( time_t now ){ + bool PoolForHost::StoredConnection::ok( time_t now ) { // if connection has been idle for an hour, kill it return ( now - when ) < 3600; } - void PoolForHost::createdOne( DBClientBase * base){ + void PoolForHost::createdOne( DBClientBase * base) { if ( _created == 0 ) _type = base->type(); - _created++; + _created++; } // ------ DBConnectionPool ------ DBConnectionPool pool; - + DBClientBase* DBConnectionPool::_get(const string& ident) { scoped_lock L(_mutex); PoolForHost& p = _pools[ident]; return p.get(); } - DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ){ + DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ) { { scoped_lock L(_mutex); PoolForHost& p = _pools[host]; @@ -105,85 +105,85 @@ namespace mongo { onCreate( conn ); onHandedOut( conn ); - + return conn; } DBClientBase* DBConnectionPool::get(const ConnectionString& url) { DBClientBase * c = _get( url.toString() ); - if ( c ){ + if ( c ) { onHandedOut( c ); return c; } - + string errmsg; c = url.connect( errmsg ); uassert( 13328 , _name + ": connect failed " + url.toString() + " : " + errmsg , c ); - + return _finishCreate( url.toString() , c ); } - + DBClientBase* DBConnectionPool::get(const string& host) { DBClientBase * c = _get( host ); - if ( c ){ + if ( c ) { onHandedOut( c ); return c; } - + string errmsg; ConnectionString cs = ConnectionString::parse( host , errmsg ); uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() ); - + c = cs.connect( errmsg ); uassert( 11002 , _name + ": connect failed " + host + " : " + errmsg , c ); return _finishCreate( host , c ); } - DBConnectionPool::~DBConnectionPool(){ + DBConnectionPool::~DBConnectionPool() { // connection closing is handled by ~PoolForHost } - void DBConnectionPool::flush(){ + void DBConnectionPool::flush() { scoped_lock L(_mutex); - for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ){ + for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ) { PoolForHost& p = i->second; p.flush(); } } - void DBConnectionPool::addHook( DBConnectionHook * hook ){ + void DBConnectionPool::addHook( DBConnectionHook * hook ) { _hooks.push_back( hook ); } - void DBConnectionPool::onCreate( DBClientBase * conn ){ + void DBConnectionPool::onCreate( DBClientBase * conn ) { if ( _hooks.size() == 0 ) return; - - for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ){ + + for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ) { (*i)->onCreate( conn ); } } - void DBConnectionPool::onHandedOut( DBClientBase * conn ){ + void DBConnectionPool::onHandedOut( DBClientBase * conn ) { if ( _hooks.size() == 0 ) return; - - for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ){ + + for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ) { (*i)->onHandedOut( conn ); } } - void DBConnectionPool::appendInfo( BSONObjBuilder& b ){ + void DBConnectionPool::appendInfo( BSONObjBuilder& b ) { BSONObjBuilder bb( b.subobjStart( "hosts" ) ); int avail = 0; long long created = 0; - - + + map<ConnectionString::ConnectionType,long long> createdByType; { scoped_lock lk( _mutex ); - for ( map<string,PoolForHost>::iterator i=_pools.begin(); i!=_pools.end(); ++i ){ + for ( map<string,PoolForHost>::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { string s = i->first; BSONObjBuilder temp( bb.subobjStart( s ) ); temp.append( "available" , i->second.numAvailable() ); @@ -198,10 +198,10 @@ namespace mongo { } } bb.done(); - + { BSONObjBuilder temp( bb.subobjStart( "createdByType" ) ); - for ( map<ConnectionString::ConnectionType,long long>::iterator i=createdByType.begin(); i!=createdByType.end(); ++i ){ + for ( map<ConnectionString::ConnectionType,long long>::iterator i=createdByType.begin(); i!=createdByType.end(); ++i ) { temp.appendNumber( ConnectionString::typeToString( i->first ) , i->second ); } temp.done(); @@ -211,15 +211,15 @@ namespace mongo { b.appendNumber( "totalCreated" , created ); } - ScopedDbConnection * ScopedDbConnection::steal(){ + ScopedDbConnection * ScopedDbConnection::steal() { assert( _conn ); ScopedDbConnection * n = new ScopedDbConnection( _host , _conn ); _conn = 0; return n; } - + ScopedDbConnection::~ScopedDbConnection() { - if ( _conn ){ + if ( _conn ) { if ( ! _conn->isFailed() ) { /* see done() comments above for why we log this line */ log() << "~ScopedDbConnection: _conn != null" << endl; @@ -229,20 +229,20 @@ namespace mongo { } ScopedDbConnection::ScopedDbConnection(const Shard& shard ) - : _host( shard.getConnString() ) , _conn( pool.get(_host) ){ + : _host( shard.getConnString() ) , _conn( pool.get(_host) ) { } - + ScopedDbConnection::ScopedDbConnection(const Shard* shard ) - : _host( shard->getConnString() ) , _conn( pool.get(_host) ){ + : _host( shard->getConnString() ) , _conn( pool.get(_host) ) { } class PoolFlushCmd : public Command { public: - PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ){} + PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ) {} virtual void help( stringstream &help ) const { help<<"internal"; } virtual LockType locktype() const { return NONE; } - virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ + virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool) { pool.flush(); return true; } @@ -254,10 +254,10 @@ namespace mongo { class PoolStats : public Command { public: - PoolStats() : Command( "connPoolStats" ){} + PoolStats() : Command( "connPoolStats" ) {} virtual void help( stringstream &help ) const { help<<"stats about connection pool"; } virtual LockType locktype() const { return NONE; } - virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ + virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool) { pool.appendInfo( result ); result.append( "numDBClientConnection" , DBClientConnection::getNumConnections() ); result.append( "numAScopedConnection" , AScopedConnection::getNumConnections() ); diff --git a/client/connpool.h b/client/connpool.h index c2673a44154..4f0245cbee0 100644 --- a/client/connpool.h +++ b/client/connpool.h @@ -24,7 +24,7 @@ namespace mongo { class Shard; - + /** * not thread safe * thread safety is handled by DBConnectionPool @@ -32,9 +32,9 @@ namespace mongo { class PoolForHost { public: PoolForHost() - : _created(0){} - - PoolForHost( const PoolForHost& other ){ + : _created(0) {} + + PoolForHost( const PoolForHost& other ) { assert(other._pool.size() == 0); _created = other._created; assert( _created == 0 ); @@ -49,16 +49,16 @@ namespace mongo { ConnectionString::ConnectionType type() const { assert(_created); return _type; } - /** + /** * gets a connection or return NULL */ DBClientBase * get(); - + void done( DBClientBase * c ); - + void flush(); private: - + struct StoredConnection { StoredConnection( DBClientBase * c ); @@ -72,24 +72,24 @@ namespace mongo { long long _created; ConnectionString::ConnectionType _type; }; - + class DBConnectionHook { public: - virtual ~DBConnectionHook(){} - virtual void onCreate( DBClientBase * conn ){} - virtual void onHandedOut( DBClientBase * conn ){} + virtual ~DBConnectionHook() {} + virtual void onCreate( DBClientBase * conn ) {} + virtual void onHandedOut( DBClientBase * conn ) {} }; /** Database connection pool. Generally, use ScopedDbConnection and do not call these directly. - This class, so far, is suitable for use with unauthenticated connections. - Support for authenticated connections requires some adjustements: please + This class, so far, is suitable for use with unauthenticated connections. + Support for authenticated connections requires some adjustements: please request... Usage: - + { ScopedDbConnection c("myserver"); c.conn()... @@ -100,15 +100,15 @@ namespace mongo { map<string,PoolForHost> _pools; // servername -> pool list<DBConnectionHook*> _hooks; string _name; - + DBClientBase* _get( const string& ident ); - + DBClientBase* _finishCreate( const string& ident , DBClientBase* conn ); - public: + public: DBConnectionPool() : _mutex("DBConnectionPool") , _name( "dbconnectionpool" ) { } ~DBConnectionPool(); - + /** right now just controls some asserts. defaults to "dbconnectionpool" */ void setName( const string& name ) { _name = name; } @@ -121,7 +121,7 @@ namespace mongo { DBClientBase *get(const ConnectionString& host); void release(const string& host, DBClientBase *c) { - if ( c->isFailed() ){ + if ( c->isFailed() ) { delete c; return; } @@ -131,7 +131,7 @@ namespace mongo { void addHook( DBConnectionHook * hook ); void appendInfo( BSONObjBuilder& b ); }; - + extern DBConnectionPool pool; class AScopedConnection : boost::noncopyable { @@ -152,21 +152,21 @@ namespace mongo { }; /** Use to get a connection from the pool. On exceptions things - clean up nicely (i.e. the socket gets closed automatically when the + clean up nicely (i.e. the socket gets closed automatically when the scopeddbconnection goes out of scope). */ class ScopedDbConnection : public AScopedConnection { public: /** the main constructor you want to use - throws UserException if can't connect + throws UserException if can't connect */ explicit ScopedDbConnection(const string& host) : _host(host), _conn( pool.get(host) ) {} - + ScopedDbConnection() : _host( "" ) , _conn(0) {} /* @param conn - bind to an existing connection */ ScopedDbConnection(const string& host, DBClientBase* conn ) : _host( host ) , _conn( conn ) {} - + /** throws UserException if can't connect */ explicit ScopedDbConnection(const ConnectionString& url ) : _host(url.toString()), _conn( pool.get(url) ) {} @@ -177,11 +177,11 @@ namespace mongo { ~ScopedDbConnection(); /** get the associated connection object */ - DBClientBase* operator->(){ + DBClientBase* operator->() { uassert( 11004 , "connection was returned to the pool already" , _conn ); - return _conn; + return _conn; } - + /** get the associated connection object */ DBClientBase& conn() { uassert( 11005 , "connection was returned to the pool already" , _conn ); @@ -193,7 +193,7 @@ namespace mongo { uassert( 13102 , "connection was returned to the pool already" , _conn ); return _conn; } - + string getHost() const { return _host; } /** Force closure of the connection. You should call this if you leave it in @@ -205,8 +205,8 @@ namespace mongo { } /** Call this when you are done with the connection. - - If you do not call done() before this object goes out of scope, + + If you do not call done() before this object goes out of scope, we can't be sure we fully read all expected data of a reply on the socket. so we don't try to reuse the connection in that situation. */ @@ -214,7 +214,7 @@ namespace mongo { if ( ! _conn ) return; - /* we could do this, but instead of assume one is using autoreconnect mode on the connection + /* we could do this, but instead of assume one is using autoreconnect mode on the connection if ( _conn->isFailed() ) kill(); else @@ -222,7 +222,7 @@ namespace mongo { pool.release(_host, _conn); _conn = 0; } - + ScopedDbConnection * steal(); private: diff --git a/client/constants.h b/client/constants.h index 66aa9b1516f..54f3fd216f2 100644 --- a/client/constants.h +++ b/client/constants.h @@ -2,22 +2,22 @@ #pragma once -namespace mongo { +namespace mongo { /* query results include a 32 result flag word consisting of these bits */ enum ResultFlagType { - /* returned, with zero results, when getMore is called but the cursor id + /* returned, with zero results, when getMore is called but the cursor id is not valid at the server. */ - ResultFlag_CursorNotFound = 1, - + ResultFlag_CursorNotFound = 1, + /* { $err : ... } is being returned */ - ResultFlag_ErrSet = 2, - + ResultFlag_ErrSet = 2, + /* Have to update config from the server, usually $err is also set */ - ResultFlag_ShardConfigStale = 4, - - /* for backward compatability: this let's us know the server supports - the QueryOption_AwaitData option. if it doesn't, a repl slave client should sleep + ResultFlag_ShardConfigStale = 4, + + /* for backward compatability: this let's us know the server supports + the QueryOption_AwaitData option. if it doesn't, a repl slave client should sleep a little between getMore's. */ ResultFlag_AwaitCapable = 8 diff --git a/client/dbclient.cpp b/client/dbclient.cpp index 8b768b94cff..e30a9bbbab7 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -32,7 +32,7 @@ namespace mongo { DBClientBase* ConnectionString::connect( string& errmsg ) const { - switch ( _type ){ + switch ( _type ) { case MASTER: { DBClientConnection * c = new DBClientConnection(true); log(1) << "creating new connection to:" << _servers[0] << endl; @@ -42,11 +42,11 @@ namespace mongo { } return c; } - - case PAIR: + + case PAIR: case SET: { DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers ); - if( ! set->connect() ){ + if( ! set->connect() ) { delete set; errmsg = "connect failed to set "; errmsg += toString(); @@ -54,7 +54,7 @@ namespace mongo { } return set; } - + case SYNC: { // TODO , don't copy list<HostAndPort> l; @@ -62,41 +62,41 @@ namespace mongo { l.push_back( _servers[i] ); return new SyncClusterConnection( l ); } - + case INVALID: throw UserException( 13421 , "trying to connect to invalid ConnectionString" ); break; } - + assert( 0 ); return 0; } - ConnectionString ConnectionString::parse( const string& host , string& errmsg ){ - + ConnectionString ConnectionString::parse( const string& host , string& errmsg ) { + string::size_type i = host.find( '/' ); - if ( i != string::npos && i != 0){ + if ( i != string::npos && i != 0) { // replica set return ConnectionString( SET , host.substr( i + 1 ) , host.substr( 0 , i ) ); } int numCommas = str::count( host , ',' ); - - if( numCommas == 0 ) + + if( numCommas == 0 ) return ConnectionString( HostAndPort( host ) ); - - if ( numCommas == 1 ) + + if ( numCommas == 1 ) return ConnectionString( PAIR , host ); if ( numCommas == 2 ) return ConnectionString( SYNC , host ); - + errmsg = (string)"invalid hostname [" + host + "]"; return ConnectionString(); // INVALID } - string ConnectionString::typeToString( ConnectionType type ){ - switch ( type ){ + string ConnectionString::typeToString( ConnectionType type ) { + switch ( type ) { case INVALID: return "invalid"; case MASTER: @@ -111,9 +111,9 @@ namespace mongo { assert(0); return ""; } - - Query& Query::where(const string &jscode, BSONObj scope) { + + Query& Query::where(const string &jscode, BSONObj scope) { /* use where() before sort() and hint() and explain(), else this will assert. */ assert( ! isComplex() ); BSONObjBuilder b; @@ -131,44 +131,44 @@ namespace mongo { obj = b.obj(); } - Query& Query::sort(const BSONObj& s) { + Query& Query::sort(const BSONObj& s) { appendComplex( "orderby", s ); - return *this; + return *this; } Query& Query::hint(BSONObj keyPattern) { appendComplex( "$hint", keyPattern ); - return *this; + return *this; } Query& Query::explain() { appendComplex( "$explain", true ); - return *this; + return *this; } - + Query& Query::snapshot() { appendComplex( "$snapshot", true ); - return *this; + return *this; } - + Query& Query::minKey( const BSONObj &val ) { appendComplex( "$min", val ); - return *this; + return *this; } Query& Query::maxKey( const BSONObj &val ) { appendComplex( "$max", val ); - return *this; + return *this; } - bool Query::isComplex( bool * hasDollar ) const{ - if ( obj.hasElement( "query" ) ){ + bool Query::isComplex( bool * hasDollar ) const { + if ( obj.hasElement( "query" ) ) { if ( hasDollar ) hasDollar[0] = false; return true; } - if ( obj.hasElement( "$query" ) ){ + if ( obj.hasElement( "$query" ) ) { if ( hasDollar ) hasDollar[0] = true; return true; @@ -176,12 +176,12 @@ namespace mongo { return false; } - + BSONObj Query::getFilter() const { bool hasDollar; if ( ! isComplex( &hasDollar ) ) return obj; - + return obj.getObjectField( hasDollar ? "$query" : "query" ); } BSONObj Query::getSort() const { @@ -200,8 +200,8 @@ namespace mongo { bool Query::isExplain() const { return isComplex() && obj.getBoolField( "$explain" ); } - - string Query::toString() const{ + + string Query::toString() const { return obj.toString(); } @@ -221,7 +221,7 @@ namespace mongo { } return _cachedAvailableOptions; } - + inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) { string ns = dbname + ".$cmd"; info = findOne(ns, cmd, 0 , options); @@ -240,7 +240,7 @@ namespace mongo { return runCommand(dbname, b.done(), *info); } - unsigned long long DBClientWithCommands::count(const string &_ns, const BSONObj& query, int options, int limit, int skip ) { + unsigned long long DBClientWithCommands::count(const string &_ns, const BSONObj& query, int options, int limit, int skip ) { NamespaceString ns(_ns); BSONObjBuilder b; b.append( "count" , ns.coll ); @@ -258,27 +258,27 @@ namespace mongo { BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}"); - BSONObj DBClientWithCommands::getLastErrorDetailed() { + BSONObj DBClientWithCommands::getLastErrorDetailed() { BSONObj info; runCommand("admin", getlasterrorcmdobj, info); - return info; + return info; } - string DBClientWithCommands::getLastError() { + string DBClientWithCommands::getLastError() { BSONObj info = getLastErrorDetailed(); return getLastErrorString( info ); } - - string DBClientWithCommands::getLastErrorString( const BSONObj& info ){ + + string DBClientWithCommands::getLastErrorString( const BSONObj& info ) { BSONElement e = info["err"]; if( e.eoo() ) return ""; if( e.type() == Object ) return e.toString(); - return e.str(); + return e.str(); } BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}"); - BSONObj DBClientWithCommands::getPrevError() { + BSONObj DBClientWithCommands::getPrevError() { BSONObj info; runCommand("admin", getpreverrorcmdobj, info); return info; @@ -286,7 +286,7 @@ namespace mongo { BSONObj getnoncecmdobj = fromjson("{getnonce:1}"); - string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ){ + string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ) { md5digest d; { md5_state_t st; @@ -300,9 +300,9 @@ namespace mongo { } bool DBClientWithCommands::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { - string password = password_text; - if( digestPassword ) - password = createPasswordDigest( username , password_text ); + string password = password_text; + if( digestPassword ) + password = createPasswordDigest( username , password_text ); BSONObj info; string nonce; @@ -333,8 +333,8 @@ namespace mongo { b << "key" << digestToString( d ); authCmd = b.done(); } - - if( runCommand(dbname, authCmd, info) ) + + if( runCommand(dbname, authCmd, info) ) return true; errmsg = info.toString(); @@ -345,7 +345,7 @@ namespace mongo { bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) { BSONObj o; - if ( info == 0 ) + if ( info == 0 ) info = &o; bool ok = runCommand("admin", ismastercmdobj, *info); isMaster = info->getField("ismaster").trueValue(); @@ -354,7 +354,7 @@ namespace mongo { bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) { BSONObj o; - if ( info == 0 ) info = &o; + if ( info == 0 ) info = &o; BSONObjBuilder b; string db = nsToDatabase(ns.c_str()); b.append("create", ns.c_str() + db.length() + 1); @@ -404,7 +404,7 @@ namespace mongo { return false; } - BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { + BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { BSONObjBuilder b; b.append("mapreduce", nsGetCollection(ns)); b.appendCode("map", jsmapf); @@ -435,27 +435,27 @@ namespace mongo { return eval(dbname, jscode, info, retValue); } - list<string> DBClientWithCommands::getDatabaseNames(){ + list<string> DBClientWithCommands::getDatabaseNames() { BSONObj info; uassert( 10005 , "listdatabases failed" , runCommand( "admin" , BSON( "listDatabases" << 1 ) , info ) ); uassert( 10006 , "listDatabases.databases not array" , info["databases"].type() == Array ); - + list<string> names; - + BSONObjIterator i( info["databases"].embeddedObjectUserCheck() ); - while ( i.more() ){ + while ( i.more() ) { names.push_back( i.next().embeddedObjectUserCheck()["name"].valuestr() ); } return names; } - list<string> DBClientWithCommands::getCollectionNames( const string& db ){ + list<string> DBClientWithCommands::getCollectionNames( const string& db ) { list<string> names; - + string ns = db + ".system.namespaces"; auto_ptr<DBClientCursor> c = query( ns.c_str() , BSONObj() ); - while ( c->more() ){ + while ( c->more() ) { string name = c->next()["name"].valuestr(); if ( name.find( "$" ) != string::npos ) continue; @@ -464,9 +464,9 @@ namespace mongo { return names; } - bool DBClientWithCommands::exists( const string& ns ){ + bool DBClientWithCommands::exists( const string& ns ) { list<string> names; - + string db = nsGetDB( ns ) + ".system.namespaces"; BSONObj q = BSON( "name" << ns ); return count( db.c_str() , q ) != 0; @@ -474,21 +474,21 @@ namespace mongo { /* --- dbclientconnection --- */ - bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { - string password = password_text; - if( digestPassword ) - password = createPasswordDigest( username , password_text ); + bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { + string password = password_text; + if( digestPassword ) + password = createPasswordDigest( username , password_text ); - if( autoReconnect ) { - /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will - then have it for the next autoreconnect attempt. - */ - pair<string,string> p = pair<string,string>(username, password); - authCache[dbname] = p; - } + if( autoReconnect ) { + /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will + then have it for the next autoreconnect attempt. + */ + pair<string,string> p = pair<string,string>(username, password); + authCache[dbname] = p; + } - return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); - } + return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); + } BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { auto_ptr<DBClientCursor> c = @@ -505,20 +505,20 @@ namespace mongo { return c->nextSafe().copy(); } - bool DBClientConnection::connect(const HostAndPort& server, string& errmsg){ + bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) { _server = server; _serverString = _server.toString(); return _connect( errmsg ); } - bool DBClientConnection::_connect( string& errmsg ){ + bool DBClientConnection::_connect( string& errmsg ) { _serverString = _server.toString(); // we keep around SockAddr for connection life -- maybe MessagingPort // requires that? server.reset(new SockAddr(_server.host().c_str(), _server.port())); p.reset(new MessagingPort( _so_timeout, _logLevel )); - if (server->getAddr() == "0.0.0.0"){ + if (server->getAddr() == "0.0.0.0") { failed = true; return false; } @@ -548,27 +548,27 @@ namespace mongo { log(_logLevel) << "trying reconnect to " << _serverString << endl; string errmsg; failed = false; - if ( ! _connect(errmsg) ) { + if ( ! _connect(errmsg) ) { failed = true; log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl; throw SocketException(SocketException::CONNECT_ERROR); - } + } - log(_logLevel) << "reconnect " << _serverString << " ok" << endl; - for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { - const char *dbname = i->first.c_str(); - const char *username = i->second.first.c_str(); - const char *password = i->second.second.c_str(); - if( !DBClientBase::auth(dbname, username, password, errmsg, false) ) - log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; - } + log(_logLevel) << "reconnect " << _serverString << " ok" << endl; + for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { + const char *dbname = i->first.c_str(); + const char *username = i->second.first.c_str(); + const char *password = i->second.second.c_str(); + if( !DBClientBase::auth(dbname, username, password, errmsg, false) ) + log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; + } } auto_ptr<DBClientCursor> DBClientBase::query(const string &ns, Query query, int nToReturn, - int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) { + int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) { auto_ptr<DBClientCursor> c( new DBClientCursor( this, - ns, query.obj, nToReturn, nToSkip, - fieldsToReturn, queryOptions , batchSize ) ); + ns, query.obj, nToReturn, nToSkip, + fieldsToReturn, queryOptions , batchSize ) ); if ( c->init() ) return c; return auto_ptr< DBClientCursor >( 0 ); @@ -589,14 +589,14 @@ namespace mongo { } boost::function<void(const BSONObj &)> _f; }; - + unsigned long long DBClientConnection::query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { DBClientFunConvertor fun; fun._f = f; boost::function<void(DBClientCursorBatchIterator &)> ptr( fun ); return DBClientConnection::query( ptr, ns, query, fieldsToReturn, queryOptions ); } - + unsigned long long DBClientConnection::query( boost::function<void(DBClientCursorBatchIterator &)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { // mask options queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk ); @@ -604,11 +604,11 @@ namespace mongo { bool doExhaust = ( availableOptions() & QueryOption_Exhaust ); if ( doExhaust ) { - queryOptions |= (int)QueryOption_Exhaust; + queryOptions |= (int)QueryOption_Exhaust; } auto_ptr<DBClientCursor> c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) ); uassert( 13386, "socket error for mapping query", c.get() ); - + if ( !doExhaust ) { while( c->more() ) { DBClientCursorBatchIterator i( *c ); @@ -618,21 +618,21 @@ namespace mongo { return n; } - try { - while( 1 ) { - while( c->moreInCurrentBatch() ) { + try { + while( 1 ) { + while( c->moreInCurrentBatch() ) { DBClientCursorBatchIterator i( *c ); f( i ); n += i.n(); } - if( c->getCursorId() == 0 ) + if( c->getCursorId() == 0 ) break; c->exhaustReceiveMore(); } } - catch(std::exception&) { + catch(std::exception&) { /* connection CANNOT be used anymore as more data may be on the way from the server. we have to reconnect. */ @@ -660,16 +660,16 @@ namespace mongo { void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) { Message toSend; - + BufBuilder b; int opts = 0; b.appendNum( opts ); b.appendStr( ns ); for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i ) i->appendSelfToBufBuilder( b ); - + toSend.setData( dbInsert, b.buf(), b.len() ); - + say( toSend ); } @@ -713,63 +713,63 @@ namespace mongo { say( toSend ); } - auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ){ + auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ) { return query( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , BSON( "ns" << ns ) ); } - - void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ){ + + void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ) { dropIndex( ns , genIndexName( keys ) ); } - void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ){ + void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ) { BSONObj info; - if ( ! runCommand( nsToDatabase( ns.c_str() ) , - BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , - info ) ){ + if ( ! runCommand( nsToDatabase( ns.c_str() ) , + BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , + info ) ) { log(_logLevel) << "dropIndex failed: " << info << endl; uassert( 10007 , "dropIndex failed" , 0 ); } resetIndexCache(); } - - void DBClientWithCommands::dropIndexes( const string& ns ){ + + void DBClientWithCommands::dropIndexes( const string& ns ) { BSONObj info; - uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) , - BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") , - info ) ); + uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) , + BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") , + info ) ); resetIndexCache(); } - void DBClientWithCommands::reIndex( const string& ns ){ + void DBClientWithCommands::reIndex( const string& ns ) { list<BSONObj> all; auto_ptr<DBClientCursor> i = getIndexes( ns ); - while ( i->more() ){ + while ( i->more() ) { all.push_back( i->next().getOwned() ); } - + dropIndexes( ns ); - - for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ){ + + for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ) { BSONObj o = *i; insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , o ); } - + } - - string DBClientWithCommands::genIndexName( const BSONObj& keys ){ + + string DBClientWithCommands::genIndexName( const BSONObj& keys ) { stringstream ss; - + bool first = 1; for ( BSONObjIterator i(keys); i.more(); ) { BSONElement f = i.next(); - + if ( first ) first = 0; else ss << "_"; - + ss << f.fieldName() << "_"; if( f.isNumber() ) ss << f.numberInt(); @@ -794,7 +794,7 @@ namespace mongo { toSave.append( "name" , nn ); cacheKey += nn; } - + if ( unique ) toSave.appendBool( "unique", unique ); @@ -837,9 +837,10 @@ namespace mongo { void DBClientConnection::say( Message &toSend ) { checkConnection(); - try { + try { port().say( toSend ); - } catch( SocketException & ) { + } + catch( SocketException & ) { failed = true; throw; } @@ -849,16 +850,16 @@ namespace mongo { port().piggyBack( toSend ); } - void DBClientConnection::recv( Message &m ) { + void DBClientConnection::recv( Message &m ) { port().recv(m); } bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) { - /* todo: this is very ugly messagingport::call returns an error code AND can throw - an exception. we should make it return void and just throw an exception anytime + /* todo: this is very ugly messagingport::call returns an error code AND can throw + an exception. we should make it return void and just throw an exception anytime it fails */ - try { + try { if ( !port().call(toSend, response) ) { failed = true; if ( assertOk ) @@ -867,7 +868,7 @@ namespace mongo { return false; } } - catch( SocketException & ) { + catch( SocketException & ) { failed = true; throw; } @@ -888,15 +889,15 @@ namespace mongo { } } - void DBClientConnection::killCursor( long long cursorId ){ + void DBClientConnection::killCursor( long long cursorId ) { BufBuilder b; b.appendNum( (int)0 ); // reserved b.appendNum( (int)1 ); // number b.appendNum( cursorId ); - + Message m; m.setData( dbKillCursors , b.buf() , b.len() ); - + sayPiggyBack( m ); } @@ -912,5 +913,5 @@ namespace mongo { return false; return true; } - + } // namespace mongo diff --git a/client/dbclient.h b/client/dbclient.h index 87fae77012c..cc5e5b3fe3b 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -40,7 +40,7 @@ namespace mongo { /** allow query of replica slave. normally these return an error except for namespace "local". */ QueryOption_SlaveOk = 1 << 2, - + // findingStart mode is used to find the first operation of interest when // we are scanning through a repl log. For efficiency in the common case, // where the first operation of interest is closer to the tail than the head, @@ -52,24 +52,24 @@ namespace mongo { QueryOption_OplogReplay = 1 << 3, /** The server normally times out idle cursors after an inactivy period to prevent excess memory uses - Set this option to prevent that. + Set this option to prevent that. */ QueryOption_NoCursorTimeout = 1 << 4, - /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while rather + /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal. */ QueryOption_AwaitData = 1 << 5, - /** Stream the data down full blast in multiple "more" packages, on the assumption that the client - will fully read all data queried. Faster when you are pulling a lot of data and know you want to + /** Stream the data down full blast in multiple "more" packages, on the assumption that the client + will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: it is not allowed to not read all the data unless you close the connection. - Use the query( boost::function<void(const BSONObj&)> f, ... ) version of the connection's query() + Use the query( boost::function<void(const BSONObj&)> f, ... ) version of the connection's query() method, and it will take care of all the details for you. */ QueryOption_Exhaust = 1 << 6, - + QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | QueryOption_Exhaust }; @@ -78,7 +78,7 @@ namespace mongo { /** Upsert - that is, insert the item if no matching item is found. */ UpdateOption_Upsert = 1 << 0, - /** Update multiple documents (if multiple documents match query expression). + /** Update multiple documents (if multiple documents match query expression). (Default is update a single document and stop.) */ UpdateOption_Multi = 1 << 1, @@ -103,7 +103,7 @@ namespace mongo { * server:port * foo/server:port,server:port SET * server,server,server SYNC - * + * * tyipcal use * string errmsg, * ConnectionString cs = ConnectionString::parse( url , errmsg ); @@ -113,19 +113,19 @@ namespace mongo { class ConnectionString { public: enum ConnectionType { INVALID , MASTER , PAIR , SET , SYNC }; - - ConnectionString( const HostAndPort& server ){ + + ConnectionString( const HostAndPort& server ) { _type = MASTER; _servers.push_back( server ); _finishInit(); } - ConnectionString( ConnectionType type , const string& s , const string& setName = "" ){ + ConnectionString( ConnectionType type , const string& s , const string& setName = "" ) { _type = type; _setName = setName; _fillServers( s ); - - switch ( _type ){ + + switch ( _type ) { case MASTER: assert( _servers.size() == 1 ); break; @@ -139,13 +139,13 @@ namespace mongo { default: assert( _servers.size() > 0 ); } - + _finishInit(); } - ConnectionString( const string& s , ConnectionType favoredMultipleType ){ + ConnectionString( const string& s , ConnectionType favoredMultipleType ) { _fillServers( s ); - if ( _servers.size() == 1 ){ + if ( _servers.size() == 1 ) { _type = MASTER; } else { @@ -156,14 +156,14 @@ namespace mongo { } bool isValid() const { return _type != INVALID; } - + string toString() const { return _string; } - + DBClientBase* connect( string& errmsg ) const; - - string getSetName() const{ + + string getSetName() const { return _setName; } @@ -172,29 +172,29 @@ namespace mongo { } static ConnectionString parse( const string& url , string& errmsg ); - + static string typeToString( ConnectionType type ); - + private: - ConnectionString(){ + ConnectionString() { _type = INVALID; } - - void _fillServers( string s ){ + + void _fillServers( string s ) { string::size_type idx; - while ( ( idx = s.find( ',' ) ) != string::npos ){ + while ( ( idx = s.find( ',' ) ) != string::npos ) { _servers.push_back( s.substr( 0 , idx ) ); s = s.substr( idx + 1 ); } _servers.push_back( s ); } - - void _finishInit(){ + + void _finishInit() { stringstream ss; if ( _type == SET ) ss << _setName << "/"; - for ( unsigned i=0; i<_servers.size(); i++ ){ + for ( unsigned i=0; i<_servers.size(); i++ ) { if ( i > 0 ) ss << ","; ss << _servers[i].toString(); @@ -207,7 +207,7 @@ namespace mongo { string _string; string _setName; }; - + /** * controls how much a clients cares about writes * default is NORMAL @@ -223,7 +223,7 @@ namespace mongo { class DBClientCursor; class DBClientCursorBatchIterator; - /** Represents a Mongo query expression. Typically one uses the QUERY(...) macro to construct a Query object. + /** Represents a Mongo query expression. Typically one uses the QUERY(...) macro to construct a Query object. Examples: QUERY( "age" << 33 << "school" << "UCLA" ).sort("name") QUERY( "age" << GT << 30 << LT << 50 ) @@ -233,22 +233,22 @@ namespace mongo { BSONObj obj; Query() : obj(BSONObj()) { } Query(const BSONObj& b) : obj(b) { } - Query(const string &json) : + Query(const string &json) : obj(fromjson(json)) { } - Query(const char * json) : + Query(const char * json) : obj(fromjson(json)) { } - /** Add a sort (ORDER BY) criteria to the query expression. + /** Add a sort (ORDER BY) criteria to the query expression. @param sortPattern the sort order template. For example to order by name ascending, time descending: { name : 1, ts : -1 } i.e. BSON( "name" << 1 << "ts" << -1 ) - or + or fromjson(" name : 1, ts : -1 ") */ Query& sort(const BSONObj& sortPattern); - /** Add a sort (ORDER BY) criteria to the query expression. + /** Add a sort (ORDER BY) criteria to the query expression. This version of sort() assumes you want to sort on a single field. @param asc = 1 for ascending order asc = -1 for descending order @@ -277,8 +277,8 @@ namespace mongo { */ Query& explain(); - /** Use snapshot mode for the query. Snapshot mode assures no duplicates are returned, or objects missed, which were - present at both the start and end of the query's execution (if an object is new during the query, or deleted during + /** Use snapshot mode for the query. Snapshot mode assures no duplicates are returned, or objects missed, which were + present at both the start and end of the query's execution (if an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode). Note that short query responses (less than 1MB) are always effectively snapshotted. @@ -287,16 +287,16 @@ namespace mongo { */ Query& snapshot(); - /** Queries to the Mongo database support a $where parameter option which contains - a javascript function that is evaluated to see whether objects being queried match - its criteria. Use this helper to append such a function to a query object. + /** Queries to the Mongo database support a $where parameter option which contains + a javascript function that is evaluated to see whether objects being queried match + its criteria. Use this helper to append such a function to a query object. Your query may also contain other traditional Mongo query terms. - @param jscode The javascript function to evaluate against each potential object - match. The function must return true for matched objects. Use the this + @param jscode The javascript function to evaluate against each potential object + match. The function must return true for matched objects. Use the this variable to inspect the current object. - @param scope SavedContext for the javascript object. List in a BSON object any - variables you would like defined when the jscode executes. One can think + @param scope SavedContext for the javascript object. List in a BSON object any + variables you would like defined when the jscode executes. One can think of these as "bind variables". Examples: @@ -310,12 +310,12 @@ namespace mongo { * if this query has an orderby, hint, or some other field */ bool isComplex( bool * hasDollar = 0 ) const; - + BSONObj getFilter() const; BSONObj getSort() const; BSONObj getHint() const; bool isExplain() const; - + string toString() const; operator string() const { return toString(); } private: @@ -326,13 +326,13 @@ namespace mongo { BSONObjBuilder b; b.appendElements(obj); b.append(fieldName, val); - obj = b.obj(); + obj = b.obj(); } }; - -/** Typically one uses the QUERY(...) macro to construct a Query object. - Example: QUERY( "age" << 33 << "school" << "UCLA" ) -*/ + + /** Typically one uses the QUERY(...) macro to construct a Query object. + Example: QUERY( "age" << 33 << "school" << "UCLA" ) + */ #define QUERY(x) mongo::Query( BSON(x) ) /** @@ -360,9 +360,9 @@ namespace mongo { /** don't use this - called automatically by DBClientCursor for you */ virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0; - + virtual void insert( const string &ns, BSONObj obj ) = 0; - + virtual void insert( const string &ns, const vector< BSONObj >& v ) = 0; virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0; @@ -406,18 +406,18 @@ namespace mongo { directly call runCommand. @param dbname database name. Use "admin" for global administrative commands. - @param cmd the command object to execute. For example, { ismaster : 1 } - @param info the result object the database returns. Typically has { ok : ..., errmsg : ... } fields - set. + @param cmd the command object to execute. For example, { ismaster : 1 } + @param info the result object the database returns. Typically has { ok : ..., errmsg : ... } fields + set. @param options see enum QueryOptions - normally not needed to run a command @return true if the command returned "ok". */ virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); /** Authorize access to a particular database. - Authentication is separate for each database on the server -- you may authenticate for any + Authentication is separate for each database on the server -- you may authenticate for any number of databases on a single connection. - The "admin" database is special and once authenticated provides access to all databases on the + The "admin" database is special and once authenticated provides access to all databases on the server. @param digestPassword if password is plain text, set this to true. otherwise assumed to be pre-digested @return true if successful @@ -459,14 +459,14 @@ namespace mongo { */ bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0); - /** Get error result from the last operation on this connection. + /** Get error result from the last operation on this connection. @return error message text, or empty string if no error. */ string getLastError(); - /** Get error result from the last operation on this connection. - @return full error object. - */ - virtual BSONObj getLastErrorDetailed(); + /** Get error result from the last operation on this connection. + @return full error object. + */ + virtual BSONObj getLastErrorDetailed(); static string getLastErrorString( const BSONObj& res ); @@ -475,23 +475,23 @@ namespace mongo { @return { err : <error message>, nPrev : <how_many_ops_back_occurred>, ok : 1 } result.err will be null if no error has occurred. - */ + */ BSONObj getPrevError(); - /** Reset the previous error state for this connection (accessed via getLastError and - getPrevError). Useful when performing several operations at once and then checking + /** Reset the previous error state for this connection (accessed via getLastError and + getPrevError). Useful when performing several operations at once and then checking for an error after attempting all operations. */ bool resetError() { return simpleCommand("admin", 0, "reseterror"); } - /** Delete the specified collection. */ - virtual bool dropCollection( const string &ns ){ + /** Delete the specified collection. */ + virtual bool dropCollection( const string &ns ) { string db = nsGetDB( ns ); string coll = nsGetCollection( ns ); uassert( 10011 , "no collection name", coll.size() ); BSONObj info; - + bool res = runCommand( db.c_str() , BSON( "drop" << coll ) , info ); resetIndexCache(); return res; @@ -503,7 +503,7 @@ namespace mongo { bool repairDatabase(const string &dbname, BSONObj *info = 0) { return simpleCommand(dbname, info, "repairDatabase"); } - + /** Copy database from one server or name to another server or name. Generally, you should dropDatabase() first as otherwise the copied information will MERGE @@ -533,23 +533,23 @@ namespace mongo { ProfileOff = 0, ProfileSlow = 1, // log very slow (>100ms) operations ProfileAll = 2 - + }; bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0); bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0); - /** Run a map/reduce job on the server. + /** Run a map/reduce job on the server. See http://www.mongodb.org/display/DOCS/MapReduce ns namespace (db+collection name) of input data - jsmapf javascript map function code - jsreducef javascript reduce function code. + jsmapf javascript map function code + jsreducef javascript reduce function code. query optional query filter for the input - output optional permanent output collection name. if not specified server will + output optional permanent output collection name. if not specified server will generate a temporary collection and return its name. - returns a result object which contains: + returns a result object which contains: { result : <collection_name>, numObjects : <number_of_objects_scanned>, timeMillis : <job_time>, @@ -557,8 +557,8 @@ namespace mongo { [, err : <errmsg_if_error>] } - For example one might call: - result.getField("ok").trueValue() + For example one might call: + result.getField("ok").trueValue() on the result to check if ok. */ BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), const string& output = ""); @@ -569,7 +569,7 @@ namespace mongo { jscode source code for a javascript function. info the command object which contains any information on the invocation result including the return value and other information. If an error occurs running the jscode, error - information will be in info. (try "out() << info.toString()") + information will be in info. (try "out() << info.toString()") retValue return value from the jscode function. args args to pass to the jscode function. when invoked, the 'args' variable will be defined for use by the jscode. @@ -583,7 +583,7 @@ namespace mongo { /** validate a collection, checking for errors and reporting back statistics. this operation is slow and blocking. */ - bool validate( const string &ns , bool scandata=true ){ + bool validate( const string &ns , bool scandata=true ) { BSONObj cmd = BSON( "validate" << nsGetCollection( ns ) << "scandata" << scandata ); BSONObj info; return runCommand( nsGetDB( ns ).c_str() , cmd , info ); @@ -616,7 +616,7 @@ namespace mongo { ret = (NumType) retValue.number(); return true; } - + /** get a list of all the current databases uses the { listDatabases : 1 } command. @@ -632,7 +632,7 @@ namespace mongo { bool exists( const string& ns ); /** Create an index if it does not already exist. - ensureIndex calls are remembered so it is safe/fast to call this function many + ensureIndex calls are remembered so it is safe/fast to call this function many times in your code. @param ns collection to be indexed @param keys the "key pattern" for the index. e.g., { name : 1 } @@ -642,7 +642,7 @@ namespace mongo { @return whether or not sent message to db. should be true on first call, false on subsequent unless resetIndexCache was called */ - virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "", + virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "", bool cache = true ); /** @@ -651,17 +651,17 @@ namespace mongo { virtual void resetIndexCache(); virtual auto_ptr<DBClientCursor> getIndexes( const string &ns ); - + virtual void dropIndex( const string& ns , BSONObj keys ); virtual void dropIndex( const string& ns , const string& indexName ); - + /** drops all indexes for the collection */ virtual void dropIndexes( const string& ns ); virtual void reIndex( const string& ns ); - + string genIndexName( const BSONObj& keys ); /** Erase / drop an entire database */ @@ -674,33 +674,33 @@ namespace mongo { virtual string toString() = 0; /** @return the database name portion of an ns string */ - string nsGetDB( const string &ns ){ + string nsGetDB( const string &ns ) { string::size_type pos = ns.find( "." ); if ( pos == string::npos ) return ns; - + return ns.substr( 0 , pos ); } - + /** @return the collection name portion of an ns string */ - string nsGetCollection( const string &ns ){ + string nsGetCollection( const string &ns ) { string::size_type pos = ns.find( "." ); if ( pos == string::npos ) return ""; - return ns.substr( pos + 1 ); + return ns.substr( pos + 1 ); } protected: bool isOk(const BSONObj&); - + enum QueryOptions availableOptions(); - + private: enum QueryOptions _cachedAvailableOptions; bool _haveCachedAvailableOptions; }; - + /** abstract class that implements the core db operations */ @@ -709,13 +709,13 @@ namespace mongo { WriteConcern _writeConcern; public: - DBClientBase(){ + DBClientBase() { _writeConcern = W_NORMAL; } - + WriteConcern getWriteConcern() const { return _writeConcern; } - void setWriteConcern( WriteConcern w ){ _writeConcern = w; } - + void setWriteConcern( WriteConcern w ) { _writeConcern = w; } + /** send a query to the database. @param ns namespace to query, format is <dbname>.<collectname>[.<collectname>]* @param query query to perform on the collection. this is a BSONObj (binary JSON) @@ -744,7 +744,7 @@ namespace mongo { insert an object into the database */ virtual void insert( const string &ns , BSONObj obj ); - + /** insert a vector of objects into the database */ @@ -755,14 +755,14 @@ namespace mongo { @param justOne if this true, then once a single match is found will stop */ virtual void remove( const string &ns , Query q , bool justOne = 0 ); - + /** updates objects matching query */ virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = false , bool multi = false ); - + virtual bool isFailed() const = 0; - + virtual void killCursor( long long cursorID ) = 0; virtual bool callRead( Message& toSend , Message& response ) = 0; @@ -772,16 +772,16 @@ namespace mongo { virtual ConnectionString::ConnectionType type() const = 0; }; // DBClientBase - + class DBClientReplicaSet; - - class ConnectException : public UserException { + + class ConnectException : public UserException { public: ConnectException(string msg) : UserException(9000,msg) { } }; - /** - A basic connection to the database. + /** + A basic connection to the database. This is the main entry point for talking to a simple Mongo setup */ class DBClientConnection : public DBClientBase { @@ -789,7 +789,7 @@ namespace mongo { /** @param _autoReconnect if true, automatically reconnect on a connection failure @param cp used by DBClientReplicaSet. You do not need to specify this parameter - @param timeout tcp timeout in seconds - this is for read/write, not connect. + @param timeout tcp timeout in seconds - this is for read/write, not connect. Connect timeout is fixed, but short, at 5 seconds. */ DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, double so_timeout=0) : @@ -797,7 +797,7 @@ namespace mongo { _numConnections++; } - virtual ~DBClientConnection(){ + virtual ~DBClientConnection() { _numConnections--; } @@ -812,14 +812,14 @@ namespace mongo { @deprecated please use HostAndPort @return false if fails to connect. */ - virtual bool connect(const char * hostname, string& errmsg){ + virtual bool connect(const char * hostname, string& errmsg) { // TODO: remove this method HostAndPort t( hostname ); return connect( t , errmsg ); } /** Connect to a Mongo database server. - + If autoReconnect is true, you can try to use the DBClientConnection even when false was returned -- it will try to connect again. @@ -837,9 +837,9 @@ namespace mongo { @param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 ) */ - void connect(const string& serverHostname) { + void connect(const string& serverHostname) { string errmsg; - if( !connect(HostAndPort(serverHostname), errmsg) ) + if( !connect(HostAndPort(serverHostname), errmsg) ) throw ConnectException(string("can't connect ") + errmsg); } @@ -851,8 +851,8 @@ namespace mongo { return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions , batchSize ); } - /** Uses QueryOption_Exhaust - Exhaust mode sends back all data queries as fast as possible, with no back-and-for for OP_GETMORE. If you are certain + /** Uses QueryOption_Exhaust + Exhaust mode sends back all data queries as fast as possible, with no back-and-for for OP_GETMORE. If you are certain you will exhaust the query, it could be useful. Use DBClientCursorBatchIterator version if you want to do items in large blocks, perhaps to avoid granular locking and such. @@ -861,7 +861,7 @@ namespace mongo { unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); /** - @return true if this connection is currently in a failed state. When autoreconnect is on, + @return true if this connection is currently in a failed state. When autoreconnect is on, a connection will transition back to an ok state after reconnecting. */ bool isFailed() const { return failed; } @@ -877,18 +877,18 @@ namespace mongo { /** Returns the address of the server */ string toString() { return _serverString; } - + string getServerAddress() const { return _serverString; } - + virtual void killCursor( long long cursorID ); virtual bool callRead( Message& toSend , Message& response ) { return call( toSend , response ); } virtual void say( Message &toSend ); - virtual bool call( Message &toSend, Message &response, bool assertOk = true ); - virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; } + virtual bool call( Message &toSend, Message &response, bool assertOk = true ); + virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; } virtual void checkResponse( const char *data, int nReturned ); void setSoTimeout(double to) { _so_timeout = to; } - - static int getNumConnections(){ + + static int getNumConnections() { return _numConnections; } @@ -911,18 +911,18 @@ namespace mongo { void checkConnection() { if( failed ) _checkConnection(); } map< string, pair<string,string> > authCache; - double _so_timeout; + double _so_timeout; bool _connect( string& errmsg ); static AtomicUInt _numConnections; }; - + /** pings server to check if it's up */ bool serverAlive( const string &uri ); DBClientBase * createDirectClient(); - + } // namespace mongo #include "dbclientcursor.h" diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index e8dadffcefd..94ade4b4b23 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -26,7 +26,7 @@ #include "../util/background.h" namespace mongo { - + // -------------------------------- // ----- ReplicaSetMonitor --------- // -------------------------------- @@ -34,34 +34,34 @@ namespace mongo { // global background job responsible for checking every X amount of time class ReplicaSetMonitorWatcher : public BackgroundJob { protected: - void run(){ + void run() { sleepsecs( 20 ); try { ReplicaSetMonitor::checkAll(); } - catch ( std::exception& e ){ + catch ( std::exception& e ) { error() << "ReplicaSetMonitorWatcher: check failed: " << e.what() << endl; } } } replicaSetMonitorWatcher; - + ReplicaSetMonitor::ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ) : _lock( "ReplicaSetMonitor instance" ) , _name( name ) , _master(-1) { string errmsg; - - for ( unsigned i=0; i<servers.size(); i++ ){ + + for ( unsigned i=0; i<servers.size(); i++ ) { auto_ptr<DBClientConnection> conn( new DBClientConnection( true ) ); - if (!conn->connect( servers[i] , errmsg ) ){ + if (!conn->connect( servers[i] , errmsg ) ) { log(1) << "error connecting to seed " << servers[i] << ": " << errmsg << endl; // skip seeds that don't work continue; } - + _nodes.push_back( Node( servers[i] , conn.release() ) ); - + string maybePrimary; if (_checkConnection( _nodes[_nodes.size()-1].conn , maybePrimary, false)) { break; @@ -69,14 +69,14 @@ namespace mongo { } } - ReplicaSetMonitor::~ReplicaSetMonitor(){ + ReplicaSetMonitor::~ReplicaSetMonitor() { for ( unsigned i=0; i<_nodes.size(); i++ ) delete _nodes[i].conn; _nodes.clear(); _master = -1; } - - ReplicaSetMonitorPtr ReplicaSetMonitor::get( const string& name , const vector<HostAndPort>& servers ){ + + ReplicaSetMonitorPtr ReplicaSetMonitor::get( const string& name , const vector<HostAndPort>& servers ) { scoped_lock lk( _setsLock ); ReplicaSetMonitorPtr& m = _sets[name]; if ( ! m ) @@ -84,17 +84,17 @@ namespace mongo { if ( replicaSetMonitorWatcher.getState() == BackgroundJob::NotStarted ) replicaSetMonitorWatcher.go(); - + return m; } - void ReplicaSetMonitor::checkAll(){ + void ReplicaSetMonitor::checkAll() { set<string> seen; - - while ( true ){ + + while ( true ) { ReplicaSetMonitorPtr m; { - for ( map<string,ReplicaSetMonitorPtr>::iterator i=_sets.begin(); i!=_sets.end(); ++i ){ + for ( map<string,ReplicaSetMonitorPtr>::iterator i=_sets.begin(); i!=_sets.end(); ++i ) { string name = i->first; if ( seen.count( name ) ) continue; @@ -110,11 +110,11 @@ namespace mongo { m->check(); } - - + + } - void ReplicaSetMonitor::setConfigChangeHook( ConfigChangeHook hook ){ + void ReplicaSetMonitor::setConfigChangeHook( ConfigChangeHook hook ) { massert( 13610 , "ConfigChangeHook already specified" , _hook == 0 ); _hook = hook; } @@ -126,7 +126,7 @@ namespace mongo { { scoped_lock lk( _lock ); - for ( unsigned i=0; i<_nodes.size(); i++ ){ + for ( unsigned i=0; i<_nodes.size(); i++ ) { if ( i > 0 ) ss << ","; ss << _nodes[i].addr.toString(); @@ -135,31 +135,31 @@ namespace mongo { return ss.str(); } - void ReplicaSetMonitor::notifyFailure( const HostAndPort& server ){ - if ( _master >= 0 ){ + void ReplicaSetMonitor::notifyFailure( const HostAndPort& server ) { + if ( _master >= 0 ) { scoped_lock lk( _lock ); if ( server == _nodes[_master].addr ) _master = -1; } } - - HostAndPort ReplicaSetMonitor::getMaster(){ + + HostAndPort ReplicaSetMonitor::getMaster() { if ( _master < 0 ) _check(); - + uassert( 10009 , str::stream() << "ReplicaSetMonitor no master found for set: " << _name , _master >= 0 ); - + scoped_lock lk( _lock ); return _nodes[_master].addr; } - - HostAndPort ReplicaSetMonitor::getSlave(){ + + HostAndPort ReplicaSetMonitor::getSlave() { int x = rand() % _nodes.size(); { scoped_lock lk( _lock ); - for ( unsigned i=0; i<_nodes.size(); i++ ){ + for ( unsigned i=0; i<_nodes.size(); i++ ) { int p = ( i + x ) % _nodes.size(); if ( p == _master ) continue; @@ -174,9 +174,9 @@ namespace mongo { /** * notify the monitor that server has faild */ - void ReplicaSetMonitor::notifySlaveFailure( const HostAndPort& server ){ + void ReplicaSetMonitor::notifySlaveFailure( const HostAndPort& server ) { int x = _find( server ); - if ( x >= 0 ){ + if ( x >= 0 ) { scoped_lock lk( _lock ); _nodes[x].ok = false; } @@ -186,16 +186,16 @@ namespace mongo { BSONObj status; if (!conn->runCommand("admin", BSON("replSetGetStatus" << 1), status) || - !status.hasField("members") || - status["members"].type() != Array) { + !status.hasField("members") || + status["members"].type() != Array) { return; } - + BSONObjIterator hi(status["members"].Obj()); while (hi.more()) { BSONObj member = hi.next().Obj(); string host = member["name"].String(); - + int m = -1; if ((m = _find(host)) <= 0) { continue; @@ -206,7 +206,7 @@ namespace mongo { scoped_lock lk( _lock ); _nodes[m].ok = true; } - else { + else { scoped_lock lk( _lock ); _nodes[m].ok = false; } @@ -215,9 +215,9 @@ namespace mongo { void ReplicaSetMonitor::_checkHosts( const BSONObj& hostList, bool& changed ) { BSONObjIterator hi(hostList); - while ( hi.more() ){ + while ( hi.more() ) { string toCheck = hi.next().String(); - + if ( _find( toCheck ) >= 0 ) continue; @@ -233,34 +233,34 @@ namespace mongo { changed = true; } } - + bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ) { bool isMaster = false; bool changed = false; try { BSONObj o; c->isMaster(isMaster, &o); - + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << '\n'; - + // add other nodes string maybePrimary; - if ( o["hosts"].type() == Array ){ + if ( o["hosts"].type() == Array ) { if ( o["primary"].type() == String ) maybePrimary = o["primary"].String(); - + _checkHosts(o["hosts"].Obj(), changed); } if (o.hasField("passives") && o["passives"].type() == Array) { _checkHosts(o["passives"].Obj(), changed); } - + _checkStatus(c); } catch ( std::exception& e ) { log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl; } - + if ( changed && _hook ) _hook( this ); @@ -268,13 +268,13 @@ namespace mongo { } void ReplicaSetMonitor::_check() { - + bool triedQuickCheck = false; - + LOG(1) << "_check : " << getServerAddress() << endl; - + for ( int retry = 0; retry < 2; retry++ ) { - for ( unsigned i=0; i<_nodes.size(); i++ ){ + for ( unsigned i=0; i<_nodes.size(); i++ ) { DBClientConnection * c; { scoped_lock lk( _lock ); @@ -282,14 +282,14 @@ namespace mongo { } string maybePrimary; - if ( _checkConnection( c , maybePrimary , retry ) ){ + if ( _checkConnection( c , maybePrimary , retry ) ) { _master = i; return; } - if ( ! triedQuickCheck && maybePrimary.size() ){ + if ( ! triedQuickCheck && maybePrimary.size() ) { int x = _find( maybePrimary ); - if ( x >= 0 ){ + if ( x >= 0 ) { triedQuickCheck = true; string dummy; DBClientConnection * testConn; @@ -297,7 +297,7 @@ namespace mongo { scoped_lock lk( _lock ); testConn = _nodes[x].conn; } - if ( _checkConnection( testConn , dummy , false ) ){ + if ( _checkConnection( testConn , dummy , false ) ) { _master = x; return; } @@ -310,11 +310,11 @@ namespace mongo { } - void ReplicaSetMonitor::check(){ + void ReplicaSetMonitor::check() { // first see if the current master is fine - if ( _master >= 0 ){ + if ( _master >= 0 ) { string temp; - if ( _checkConnection( _nodes[_master].conn , temp , false ) ){ + if ( _checkConnection( _nodes[_master].conn , temp , false ) ) { // current master is fine, so we're done return; } @@ -349,14 +349,14 @@ namespace mongo { // -------------------------------- DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ) - : _monitor( ReplicaSetMonitor::get( name , servers ) ){ + : _monitor( ReplicaSetMonitor::get( name , servers ) ) { } - - DBClientReplicaSet::~DBClientReplicaSet(){ + + DBClientReplicaSet::~DBClientReplicaSet() { } DBClientConnection * DBClientReplicaSet::checkMaster() { - if ( _master ){ + if ( _master ) { // a master is selected. let's just make sure connection didn't die if ( ! _master->isFailed() ) return _master.get(); @@ -364,7 +364,7 @@ namespace mongo { } HostAndPort h = _monitor->getMaster(); - if ( h != _masterHost ){ + if ( h != _masterHost ) { _masterHost = h; _master.reset( new DBClientConnection( true ) ); _master->connect( _masterHost ); @@ -374,14 +374,14 @@ namespace mongo { } DBClientConnection * DBClientReplicaSet::checkSlave() { - if ( _slave ){ + if ( _slave ) { if ( ! _slave->isFailed() ) return _slave.get(); _monitor->notifySlaveFailure( _slaveHost ); } HostAndPort h = _monitor->getSlave(); - if ( h != _slaveHost ){ + if ( h != _slaveHost ) { _slaveHost = h; _slave.reset( new DBClientConnection( true ) ); _slave->connect( _slaveHost ); @@ -391,8 +391,8 @@ namespace mongo { } - void DBClientReplicaSet::_auth( DBClientConnection * conn ){ - for ( list<AuthInfo>::iterator i=_auths.begin(); i!=_auths.end(); ++i ){ + void DBClientReplicaSet::_auth( DBClientConnection * conn ) { + for ( list<AuthInfo>::iterator i=_auths.begin(); i!=_auths.end(); ++i ) { const AuthInfo& a = *i; string errmsg; if ( ! conn->auth( a.dbname , a.username , a.pwd , errmsg, a.digestPassword ) ) @@ -402,15 +402,15 @@ namespace mongo { } - DBClientConnection& DBClientReplicaSet::masterConn(){ + DBClientConnection& DBClientReplicaSet::masterConn() { return *checkMaster(); } - DBClientConnection& DBClientReplicaSet::slaveConn(){ + DBClientConnection& DBClientReplicaSet::slaveConn() { return *checkSlave(); } - bool DBClientReplicaSet::connect(){ + bool DBClientReplicaSet::connect() { try { checkMaster(); } @@ -420,17 +420,17 @@ namespace mongo { return true; } - bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) { - DBClientConnection * m = checkMaster(); + bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) { + DBClientConnection * m = checkMaster(); // first make sure it actually works - if( ! m->auth(dbname, username, pwd, errmsg, digestPassword ) ) - return false; - + if( ! m->auth(dbname, username, pwd, errmsg, digestPassword ) ) + return false; + // now that it does, we should save so that for a new node we can auth _auths.push_back( AuthInfo( dbname , username , pwd , digestPassword ) ); - return true; - } + return true; + } // ------------- simple functions ----------------- @@ -451,59 +451,59 @@ namespace mongo { } auto_ptr<DBClientCursor> DBClientReplicaSet::query(const string &ns, Query query, int nToReturn, int nToSkip, - const BSONObj *fieldsToReturn, int queryOptions, int batchSize){ + const BSONObj *fieldsToReturn, int queryOptions, int batchSize) { - if ( queryOptions & QueryOption_SlaveOk ){ + if ( queryOptions & QueryOption_SlaveOk ) { // we're ok sending to a slave // we'll try 2 slaves before just using master // checkSlave will try a different slave automatically after a failure - for ( int i=0; i<2; i++ ){ + for ( int i=0; i<2; i++ ) { try { return checkSlave()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize); } - catch ( DBException & ){ + catch ( DBException & ) { LOG(1) << "can't query replica set slave: " << _slaveHost << endl; } } } - + return checkMaster()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize); } BSONObj DBClientReplicaSet::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { - if ( queryOptions & QueryOption_SlaveOk ){ + if ( queryOptions & QueryOption_SlaveOk ) { // we're ok sending to a slave // we'll try 2 slaves before just using master // checkSlave will try a different slave automatically after a failure - for ( int i=0; i<2; i++ ){ + for ( int i=0; i<2; i++ ) { try { return checkSlave()->findOne(ns,query,fieldsToReturn,queryOptions); } - catch ( DBException & ){ + catch ( DBException & ) { LOG(1) << "can't query replica set slave: " << _slaveHost << endl; } } } - + return checkMaster()->findOne(ns,query,fieldsToReturn,queryOptions); } - void DBClientReplicaSet::killCursor( long long cursorID ){ + void DBClientReplicaSet::killCursor( long long cursorID ) { checkMaster()->killCursor( cursorID ); - } + } - bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk ) { - if ( toSend.operation() == dbQuery ){ + bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk ) { + if ( toSend.operation() == dbQuery ) { // TODO: might be possible to do this faster by changing api DbMessage dm( toSend ); QueryMessage qm( dm ); - if ( qm.queryOptions & QueryOption_SlaveOk ){ - for ( int i=0; i<2; i++ ){ + if ( qm.queryOptions & QueryOption_SlaveOk ) { + for ( int i=0; i<2; i++ ) { try { return checkSlave()->call( toSend , response , assertOk ); } - catch ( DBException & ){ + catch ( DBException & ) { log(1) << "can't query replica set slave: " << _slaveHost << endl; } } diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h index bbc06a6829d..62b008edfac 100644 --- a/client/dbclient_rs.h +++ b/client/dbclient_rs.h @@ -34,7 +34,7 @@ namespace mongo { */ class ReplicaSetMonitor { public: - + typedef boost::function1<void,const ReplicaSetMonitor*> ConfigChangeHook; /** @@ -57,7 +57,7 @@ namespace mongo { static void setConfigChangeHook( ConfigChangeHook hook ); ~ReplicaSetMonitor(); - + /** @return HostAndPort or throws an exception */ HostAndPort getMaster(); @@ -72,7 +72,7 @@ namespace mongo { /** * notify the monitor that server has faild */ - void notifySlaveFailure( const HostAndPort& server ); + void notifySlaveFailure( const HostAndPort& server ); /** * checks for current master and new secondaries @@ -82,18 +82,18 @@ namespace mongo { string getName() const { return _name; } string getServerAddress() const; - + private: /** * This populates a list of hosts from the list of seeds (discarding the - * seed list). + * seed list). * @param name set name * @param servers seeds */ ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ); void _check(); - + /** * Use replSetGetStatus command to make sure hosts in host list are up * and readable. Sets Node::ok appropriately. @@ -107,7 +107,7 @@ namespace mongo { * @param changed if new hosts were added */ void _checkHosts(const BSONObj& hostList, bool& changed); - + /** * Updates host list. * @param c the connection to check @@ -124,14 +124,14 @@ namespace mongo { string _name; struct Node { - Node( const HostAndPort& a , DBClientConnection* c ) : addr( a ) , conn(c) , ok(true){} + Node( const HostAndPort& a , DBClientConnection* c ) : addr( a ) , conn(c) , ok(true) {} HostAndPort addr; DBClientConnection* conn; // if this node is in a failure state // used for slave routing // this is too simple, should make it better - bool ok; + bool ok; }; /** @@ -141,20 +141,20 @@ namespace mongo { int _master; // which node is the current master. -1 means no master is known - + static mongo::mutex _setsLock; // protects _sets static map<string,ReplicaSetMonitorPtr> _sets; // set name to Monitor - + static ConfigChangeHook _hook; }; /** Use this class to connect to a replica set of servers. The class will manage checking for which server in a replica set is master, and do failover automatically. - + This can also be used to connect to replica pairs since pairs are a subset of sets - - On a failover situation, expect at least one operation to return an error (throw - an exception) before the failover is complete. Operations are not retried. + + On a failover situation, expect at least one operation to return an error (throw + an exception) before the failover is complete. Operations are not retried. */ class DBClientReplicaSet : public DBClientBase { @@ -173,33 +173,33 @@ namespace mongo { /** Authorize. Authorizes all nodes as needed */ virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true ); - + // ----------- simple functions -------------- /** throws userassertion "no master found" */ virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ); - + /** throws userassertion "no master found" */ virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); virtual void insert( const string &ns , BSONObj obj ); - /** insert multiple objects. Note that single object insert is asynchronous, so this version + /** insert multiple objects. Note that single object insert is asynchronous, so this version is only nominally faster and not worth a special effort to try to use. */ virtual void insert( const string &ns, const vector< BSONObj >& v ); virtual void remove( const string &ns , Query obj , bool justOne = 0 ); - + virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ); - + virtual void killCursor( long long cursorID ); - + // ---- access raw connections ---- - + DBClientConnection& masterConn(); DBClientConnection& slaveConn(); - + // ---- callback pieces ------- virtual void checkResponse( const char *data, int nReturned ) { checkMaster()->checkResponse( data , nReturned ); } @@ -217,41 +217,41 @@ namespace mongo { string toString() { return getServerAddress(); } string getServerAddress() const { return _monitor->getServerAddress(); } - - virtual ConnectionString::ConnectionType type() const { return ConnectionString::SET; } - + + virtual ConnectionString::ConnectionType type() const { return ConnectionString::SET; } + // ---- low level ------ virtual bool call( Message &toSend, Message &response, bool assertOk=true ); virtual void say( Message &toSend ) { checkMaster()->say( toSend ); } - virtual bool callRead( Message& toSend , Message& response ){ return checkMaster()->callRead( toSend , response ); } + virtual bool callRead( Message& toSend , Message& response ) { return checkMaster()->callRead( toSend , response ); } - protected: + protected: virtual void sayPiggyBack( Message &toSend ) { checkMaster()->say( toSend ); } private: DBClientConnection * checkMaster(); DBClientConnection * checkSlave(); - + void _auth( DBClientConnection * conn ); ReplicaSetMonitorPtr _monitor; - HostAndPort _masterHost; + HostAndPort _masterHost; scoped_ptr<DBClientConnection> _master; HostAndPort _slaveHost; scoped_ptr<DBClientConnection> _slave; - + /** * for storing authentication info * fields are exactly for DBClientConnection::auth */ struct AuthInfo { - AuthInfo( string d , string u , string p , bool di ) - : dbname( d ) , username( u ) , pwd( p ) , digestPassword( di ){} + AuthInfo( string d , string u , string p , bool di ) + : dbname( d ) , username( u ) , pwd( p ) , digestPassword( di ) {} string dbname; string username; string pwd; @@ -262,8 +262,8 @@ namespace mongo { // we can re-auth // this could be a security issue, as the password is stored in memory // not sure if/how we should handle - list<AuthInfo> _auths; + list<AuthInfo> _auths; }; - + } diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp index bb4aa5320d3..d3928dcdcf9 100644 --- a/client/dbclientcursor.cpp +++ b/client/dbclientcursor.cpp @@ -26,14 +26,14 @@ namespace mongo { void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ); - int DBClientCursor::nextBatchSize(){ + int DBClientCursor::nextBatchSize() { if ( nToReturn == 0 ) return batchSize; if ( batchSize == 0 ) return nToReturn; - + return batchSize < nToReturn ? batchSize : nToReturn; } @@ -41,7 +41,8 @@ namespace mongo { Message toSend; if ( !cursorId ) { assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend ); - } else { + } + else { BufBuilder b; b.appendNum( opts ); b.appendStr( ns ); @@ -60,7 +61,7 @@ namespace mongo { void DBClientCursor::requestMore() { assert( cursorId && pos == nReturned ); - if (haveLimit){ + if (haveLimit) { nToReturn -= nReturned; assert(nToReturn > 0); } @@ -69,12 +70,12 @@ namespace mongo { b.appendStr(ns); b.appendNum(nextBatchSize()); b.appendNum(cursorId); - + Message toSend; toSend.setData(dbGetMore, b.buf(), b.len()); auto_ptr<Message> response(new Message()); - - if ( connector ){ + + if ( connector ) { connector->call( toSend, *response ); m = response; dataReceived(); @@ -105,7 +106,7 @@ namespace mongo { void DBClientCursor::dataReceived() { QueryResult *qr = (QueryResult *) m->singleData(); resultFlags = qr->resultFlags(); - + if ( qr->resultFlags() & ResultFlag_CursorNotFound ) { // cursor id no longer valid at the server. assert( qr->cursorId == 0 ); @@ -113,7 +114,7 @@ namespace mongo { if ( ! ( opts & QueryOption_CursorTailable ) ) throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" ); } - + if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { // only set initially: we don't want to kill it on end of data // if it's a tailable cursor @@ -136,7 +137,7 @@ namespace mongo { if ( !_putBack.empty() ) return true; - + if (haveLimit && pos >= nToReturn) return false; @@ -171,7 +172,7 @@ namespace mongo { int m = atMost; /* - for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) { + for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) { if( m == 0 ) return; v.push_back(*i); @@ -191,7 +192,7 @@ namespace mongo { } } - void DBClientCursor::attach( AScopedConnection * conn ){ + void DBClientCursor::attach( AScopedConnection * conn ) { assert( _scopedHost.size() == 0 ); _scopedHost = conn->getHost(); conn->done(); @@ -204,28 +205,28 @@ namespace mongo { DESTRUCTOR_GUARD ( - if ( cursorId && _ownCursor ) { - BufBuilder b; - b.appendNum( (int)0 ); // reserved - b.appendNum( (int)1 ); // number - b.appendNum( cursorId ); - - Message m; - m.setData( dbKillCursors , b.buf() , b.len() ); - - if ( connector ){ - connector->sayPiggyBack( m ); - } - else { - assert( _scopedHost.size() ); - ScopedDbConnection conn( _scopedHost ); - conn->sayPiggyBack( m ); - conn.done(); - } + if ( cursorId && _ownCursor ) { + BufBuilder b; + b.appendNum( (int)0 ); // reserved + b.appendNum( (int)1 ); // number + b.appendNum( cursorId ); + + Message m; + m.setData( dbKillCursors , b.buf() , b.len() ); + + if ( connector ) { + connector->sayPiggyBack( m ); } + else { + assert( _scopedHost.size() ); + ScopedDbConnection conn( _scopedHost ); + conn->sayPiggyBack( m ); + conn.done(); + } + } ); } - + } // namespace mongo diff --git a/client/dbclientcursor.h b/client/dbclientcursor.h index 2cdf3683db5..c86814d7e6d 100644 --- a/client/dbclientcursor.h +++ b/client/dbclientcursor.h @@ -1,4 +1,4 @@ -// file dbclientcursor.h +// file dbclientcursor.h /* Copyright 2009 10gen Inc. * @@ -24,9 +24,9 @@ #include <stack> namespace mongo { - + class AScopedConnection; - + /** for mock purposes only -- do not create variants of DBClientCursor, nor hang code here */ class DBClientCursorInterface { public: @@ -41,38 +41,38 @@ namespace mongo { DBClientCursorInterface() {} }; - /** Queries return a cursor object */ + /** Queries return a cursor object */ class DBClientCursor : public DBClientCursorInterface { public: - /** If true, safe to call next(). Requests more from server if necessary. */ + /** If true, safe to call next(). Requests more from server if necessary. */ bool more(); - /** If true, there is more in our local buffers to be fetched via next(). Returns - false when a getMore request back to server would be required. You can use this - if you want to exhaust whatever data has been fetched to the client already but + /** If true, there is more in our local buffers to be fetched via next(). Returns + false when a getMore request back to server would be required. You can use this + if you want to exhaust whatever data has been fetched to the client already but then perhaps stop. */ int objsLeftInBatch() const { _assertIfNull(); return _putBack.size() + nReturned - pos; } bool moreInCurrentBatch() { return objsLeftInBatch() > 0; } /** next - @return next object in the result cursor. + @return next object in the result cursor. on an error at the remote server, you will get back: { $err: <string> } if you do not want to handle that yourself, call nextSafe(). */ BSONObj next(); - - /** + + /** restore an object previously returned by next() to the cursor */ void putBack( const BSONObj &o ) { _putBack.push( o.getOwned() ); } - /** throws AssertionException if get back { $err : ... } */ + /** throws AssertionException if get back { $err : ... } */ BSONObj nextSafe() { BSONObj o = next(); BSONElement e = o.firstElement(); - if( strcmp(e.fieldName(), "$err") == 0 ) { + if( strcmp(e.fieldName(), "$err") == 0 ) { if( logLevel >= 5 ) log() << "nextSafe() error " << o.toString() << endl; uassert(13106, "nextSafe(): " + o.toString(), false); @@ -81,7 +81,7 @@ namespace mongo { } /** peek ahead at items buffered for future next() calls. - never requests new data from the server. so peek only effective + never requests new data from the server. so peek only effective with what is already buffered. WARNING: no support for _putBack yet! */ @@ -90,9 +90,9 @@ namespace mongo { /** iterate the rest of the cursor and return the number if items */ - int itcount(){ + int itcount() { int c = 0; - while ( more() ){ + while ( more() ) { next(); c++; } @@ -111,48 +111,48 @@ namespace mongo { bool tailable() const { return (opts & QueryOption_CursorTailable) != 0; } - - /** see ResultFlagType (constants.h) for flag values - mostly these flags are for internal purposes - + + /** see ResultFlagType (constants.h) for flag values + mostly these flags are for internal purposes - ResultFlag_ErrSet is the possible exception to that */ - bool hasResultFlag( int flag ){ + bool hasResultFlag( int flag ) { _assertIfNull(); return (resultFlags & flag) != 0; } DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn, int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) : - connector(_connector), - ns(_ns), - query(_query), - nToReturn(_nToReturn), - haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), - nToSkip(_nToSkip), - fieldsToReturn(_fieldsToReturn), - opts(queryOptions), - batchSize(bs==1?2:bs), - m(new Message()), - cursorId(), - nReturned(), - pos(), - data(), - _ownCursor( true ){ + connector(_connector), + ns(_ns), + query(_query), + nToReturn(_nToReturn), + haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), + nToSkip(_nToSkip), + fieldsToReturn(_fieldsToReturn), + opts(queryOptions), + batchSize(bs==1?2:bs), + m(new Message()), + cursorId(), + nReturned(), + pos(), + data(), + _ownCursor( true ) { } - + DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) : - connector(_connector), - ns(_ns), - nToReturn( _nToReturn ), - haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)), - opts( options ), - m(new Message()), - cursorId( _cursorId ), - nReturned(), - pos(), - data(), - _ownCursor( true ){ - } + connector(_connector), + ns(_ns), + nToReturn( _nToReturn ), + haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)), + opts( options ), + m(new Message()), + cursorId( _cursorId ), + nReturned(), + pos(), + data(), + _ownCursor( true ) { + } virtual ~DBClientCursor(); @@ -162,13 +162,13 @@ namespace mongo { message when ~DBClientCursor() is called. This function overrides that. */ void decouple() { _ownCursor = false; } - + void attach( AScopedConnection * conn ); - + private: friend class DBClientBase; friend class DBClientConnection; - bool init(); + bool init(); int nextBatchSize(); DBConnector *connector; string ns; @@ -199,7 +199,7 @@ namespace mongo { DBClientCursor( const DBClientCursor& ); DBClientCursor& operator=( const DBClientCursor& ); }; - + /** iterate over objects in current batch only - will not cause a network call */ class DBClientCursorBatchIterator { diff --git a/client/dbclientmockcursor.h b/client/dbclientmockcursor.h index 0f6dba96458..8d85ff5ad2e 100644 --- a/client/dbclientmockcursor.h +++ b/client/dbclientmockcursor.h @@ -20,7 +20,7 @@ #include "dbclientcursor.h" namespace mongo { - + class DBClientMockCursor : public DBClientCursorInterface { public: DBClientMockCursor( const BSONArray& mockCollection ) : _iter( mockCollection ) {} diff --git a/client/distlock.cpp b/client/distlock.cpp index b88c89fd5f0..0db55c96cb0 100644 --- a/client/distlock.cpp +++ b/client/distlock.cpp @@ -42,15 +42,15 @@ namespace mongo { /* =================== */ - string getDistLockProcess(){ + string getDistLockProcess() { boost::call_once( initModule, _init ); assert( _cachedProcessString ); return *_cachedProcessString; } - string getDistLockId(){ + string getDistLockId() { string s = distLockIds.get(); - if ( s.empty() ){ + if ( s.empty() ) { stringstream ss; ss << getDistLockProcess() << ":" << getThreadName() << ":" << rand(); s = ss.str(); @@ -58,27 +58,27 @@ namespace mongo { } return s; } - - void distLockPingThread( ConnectionString addr ){ + + void distLockPingThread( ConnectionString addr ) { setThreadName( "LockPinger" ); static int loops = 0; - while( ! inShutdown() ){ + while( ! inShutdown() ) { string process = getDistLockProcess(); log(4) << "dist_lock about to ping for: " << process << endl; try { ScopedDbConnection conn( addr ); - + // refresh the entry corresponding to this process in the lockpings collection - conn->update( lockPingNS , - BSON( "_id" << process ) , + conn->update( lockPingNS , + BSON( "_id" << process ) , BSON( "$set" << BSON( "ping" << DATENOW ) ) , true ); string err = conn->getLastError(); - if ( ! err.empty() ){ - log( LL_WARNING ) << "dist_lock process: " << process << " pinging: " << addr << " failed: " + if ( ! err.empty() ) { + log( LL_WARNING ) << "dist_lock process: " << process << " pinging: " << addr << " failed: " << err << endl; conn.done(); sleepsecs(30); @@ -91,32 +91,32 @@ namespace mongo { // if the lock is taken, the take-over mechanism should handle the situation auto_ptr<DBClientCursor> c = conn->query( locksNS , BSONObj() ); vector<string> pids; - while ( c->more() ){ + while ( c->more() ) { BSONObj lock = c->next(); if ( ! lock["process"].eoo() ) { pids.push_back( lock["process"].valuestrsafe() ); } - } + } Date_t fourDays = jsTime() - ( 4 * 86400 * 1000 ); // 4 days conn->remove( lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) ); err = conn->getLastError(); - if ( ! err.empty() ){ - log ( LL_WARNING ) << "dist_lock cleanup request from process: " << process << " to: " << addr + if ( ! err.empty() ) { + log ( LL_WARNING ) << "dist_lock cleanup request from process: " << process << " to: " << addr << " failed: " << err << endl; conn.done(); sleepsecs(30); continue; } - + // create index so remove is fast even with a lot of servers - if ( loops++ == 0 ){ + if ( loops++ == 0 ) { conn->ensureIndex( lockPingNS , BSON( "ping" << 1 ) ); } - + conn.done(); } - catch ( std::exception& e ){ + catch ( std::exception& e ) { log( LL_WARNING ) << "dist_lock exception during ping: " << e.what() << endl; } @@ -124,14 +124,14 @@ namespace mongo { sleepsecs(30); } } - + class DistributedLockPinger { public: DistributedLockPinger() - : _mutex( "DistributedLockPinger" ){ + : _mutex( "DistributedLockPinger" ) { } - - void got( const ConnectionString& conn ){ + + void got( const ConnectionString& conn ) { string s = conn.toString(); scoped_lock lk( _mutex ); if ( _seen.count( s ) > 0 ) @@ -139,52 +139,53 @@ namespace mongo { boost::thread t( boost::bind( &distLockPingThread , conn ) ); _seen.insert( s ); } - + set<string> _seen; mongo::mutex _mutex; - + } distLockPinger; - + DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes ) - : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes){ + : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes) { _id = BSON( "_id" << name ); _ns = "config.locks"; distLockPinger.got( conn ); } - - bool DistributedLock::lock_try( string why , BSONObj * other ){ + + bool DistributedLock::lock_try( string why , BSONObj * other ) { // write to dummy if 'other' is null - BSONObj dummyOther; + BSONObj dummyOther; if ( other == NULL ) other = &dummyOther; ScopedDbConnection conn( _conn ); - + BSONObjBuilder queryBuilder; queryBuilder.appendElements( _id ); - queryBuilder.append( "state" , 0 ); + queryBuilder.append( "state" , 0 ); - { // make sure its there so we can use simple update logic below + { + // make sure its there so we can use simple update logic below BSONObj o = conn->findOne( _ns , _id ); - if ( o.isEmpty() ){ + if ( o.isEmpty() ) { try { log(4) << "dist_lock inserting initial doc in " << _ns << " for lock " << _name << endl; conn->insert( _ns , BSON( "_id" << _name << "state" << 0 << "who" << "" ) ); } - catch ( UserException& e ){ + catch ( UserException& e ) { log() << "dist_lock could not insert initial doc: " << e << endl; } } - else if ( o["state"].numberInt() > 0 ){ + else if ( o["state"].numberInt() > 0 ) { BSONObj lastPing = conn->findOne( lockPingNS , o["process"].wrap( "_id" ) ); - if ( lastPing.isEmpty() ){ + if ( lastPing.isEmpty() ) { // if a lock is taken but there's no ping for it, we're in an inconsistent situation // if the lock holder (mongos or d) does not exist anymore, the lock could safely be removed // but we'd require analysis of the situation before a manual intervention log(LL_ERROR) << "config.locks: " << _name << " lock is taken by old process? " - << "remove the following lock if the process is not active anymore: " << o << endl; + << "remove the following lock if the process is not active anymore: " << o << endl; *other = o; other->getOwned(); conn.done(); @@ -194,18 +195,18 @@ namespace mongo { unsigned long long elapsed = jsTime() - lastPing["ping"].Date(); // in ms elapsed = elapsed / ( 1000 * 60 ); // convert to minutes - if ( elapsed <= _takeoverMinutes ){ + if ( elapsed <= _takeoverMinutes ) { log(1) << "dist_lock lock failed because taken by: " << o << " elapsed minutes: " << elapsed << endl; *other = o; other->getOwned(); conn.done(); return false; } - + log() << "dist_lock forcefully taking over from: " << o << " elapsed minutes: " << elapsed << endl; conn->update( _ns , _id , BSON( "$set" << BSON( "state" << 0 ) ) ); string err = conn->getLastError(); - if ( ! err.empty() ){ + if ( ! err.empty() ) { log( LL_WARNING ) << "dist_lock take over from: " << o << " failed: " << err << endl; *other = o; other->getOwned(); @@ -214,11 +215,11 @@ namespace mongo { } } - else if ( o["ts"].type() ){ + else if ( o["ts"].type() ) { queryBuilder.append( o["ts"] ); } } - + OID ts; ts.init(); @@ -232,11 +233,11 @@ namespace mongo { log(4) << "dist_lock about to aquire lock: " << lockDetails << endl; conn->update( _ns , queryBuilder.obj() , whatIWant ); - + BSONObj o = conn->getLastErrorDetailed(); now = conn->findOne( _ns , _id ); - - if ( o["n"].numberInt() == 0 ){ + + if ( o["n"].numberInt() == 0 ) { *other = now; other->getOwned(); log() << "dist_lock error trying to aquire lock: " << lockDetails << " error: " << o << endl; @@ -247,22 +248,22 @@ namespace mongo { } } - catch ( UpdateNotTheSame& up ){ + catch ( UpdateNotTheSame& up ) { // this means our update got through on some, but not others log(4) << "dist_lock lock did not propagate properly" << endl; - for ( unsigned i=0; i<up.size(); i++ ){ + for ( unsigned i=0; i<up.size(); i++ ) { ScopedDbConnection temp( up[i].first ); BSONObj temp2 = temp->findOne( _ns , _id ); - if ( now.isEmpty() || now["ts"] < temp2["ts"] ){ + if ( now.isEmpty() || now["ts"] < temp2["ts"] ) { now = temp2.getOwned(); } temp.done(); } - if ( now["ts"].OID() == ts ){ + if ( now["ts"].OID() == ts ) { log(4) << "dist_lock completed lock propagation" << endl; gotLock = true; conn->update( _ns , _id , whatIWant ); @@ -272,15 +273,15 @@ namespace mongo { gotLock = false; } } - + conn.done(); - + log(2) << "dist_lock lock gotLock: " << gotLock << " now: " << now << endl; return gotLock; } - void DistributedLock::unlock(){ + void DistributedLock::unlock() { const int maxAttempts = 3; int attempted = 0; while ( ++attempted <= maxAttempts ) { @@ -293,17 +294,18 @@ namespace mongo { return; - - } catch ( std::exception& e) { - log( LL_WARNING ) << "dist_lock " << _name << " failed to contact config server in unlock attempt " + + } + catch ( std::exception& e) { + log( LL_WARNING ) << "dist_lock " << _name << " failed to contact config server in unlock attempt " << attempted << ": " << e.what() << endl; sleepsecs(1 << attempted); } } - log( LL_WARNING ) << "dist_lock couldn't consumate unlock request. " << "Lock " << _name - << " will be taken over after " << _takeoverMinutes << " minutes timeout" << endl; + log( LL_WARNING ) << "dist_lock couldn't consumate unlock request. " << "Lock " << _name + << " will be taken over after " << _takeoverMinutes << " minutes timeout" << endl; } } diff --git a/client/distlock.h b/client/distlock.h index 5a933a8c2a0..491de4812bf 100644 --- a/client/distlock.h +++ b/client/distlock.h @@ -28,16 +28,16 @@ namespace mongo { /** * The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task must be identified by a * unique name across the system (e.g., "balancer"). A lock is taken by writing a document in the configdb's locks - * collection with that name. + * collection with that name. * - * To be maintained, each taken lock needs to be revalidaded ("pinged") within a pre-established amount of time. This + * To be maintained, each taken lock needs to be revalidaded ("pinged") within a pre-established amount of time. This * class does this maintenance automatically once a DistributedLock object was constructed. */ class DistributedLock { public: /** - * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired. + * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired. * Construction does trigger a lock "pinging" mechanism, though. * * @param conn address of config(s) server(s) @@ -49,7 +49,7 @@ namespace mongo { /** * Attempts to aquire 'this' lock, checking if it could or should be stolen from the previous holder. Please * consider using the dist_lock_try construct to acquire this lock in an exception safe way. - * + * * @param why human readable description of why the lock is being taken (used to log) * @param other configdb's lock document that is currently holding the lock, if lock is taken * @return true if it managed to grab the lock @@ -65,15 +65,15 @@ namespace mongo { ConnectionString _conn; string _name; unsigned _takeoverMinutes; - + string _ns; BSONObj _id; }; - + class dist_lock_try { public: dist_lock_try( DistributedLock * lock , string why ) - : _lock(lock){ + : _lock(lock) { _got = _lock->lock_try( why , &_other ); } @@ -85,11 +85,11 @@ namespace mongo { bool got() const { return _got; } BSONObj other() const { return _other; } - + private: DistributedLock * _lock; bool _got; - BSONObj _other; + BSONObj _other; }; } diff --git a/client/distlock_test.cpp b/client/distlock_test.cpp index 0879b6e2924..8cc28e37479 100644 --- a/client/distlock_test.cpp +++ b/client/distlock_test.cpp @@ -21,23 +21,23 @@ #include "../db/commands.h" namespace mongo { - + class TestDistLockWithSync : public Command { public: - TestDistLockWithSync() : Command( "_testDistLockWithSyncCluster" ){} + TestDistLockWithSync() : Command( "_testDistLockWithSyncCluster" ) {} virtual void help( stringstream& help ) const { help << "should not be calling this directly" << endl; } - + virtual bool slaveOk() const { return false; } virtual bool adminOnly() const { return true; } - virtual LockType locktype() const { return NONE; } + virtual LockType locktype() const { return NONE; } - static void runThread(){ - for ( int i=0; i<1000; i++ ){ - if ( current->lock_try( "test" ) ){ + static void runThread() { + for ( int i=0; i<1000; i++ ) { + if ( current->lock_try( "test" ) ) { gotit++; - for ( int j=0; j<2000; j++ ){ + for ( int j=0; j<2000; j++ ) { count++; } current->unlock(); @@ -45,17 +45,17 @@ namespace mongo { } } - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { DistributedLock lk( ConnectionString( cmdObj["host"].String() , ConnectionString::SYNC ), "testdistlockwithsync" ); current = &lk; count = 0; gotit = 0; - + vector<shared_ptr<boost::thread> > l; - for ( int i=0; i<4; i++ ){ + for ( int i=0; i<4; i++ ) { l.push_back( shared_ptr<boost::thread>( new boost::thread( runThread ) ) ); } - + for ( unsigned i=0; i<l.size(); i++ ) l[i]->join(); @@ -64,7 +64,7 @@ namespace mongo { current = 0; return count == gotit * 2000; } - + static DistributedLock * current; static int count; static int gotit; diff --git a/client/examples/authTest.cpp b/client/examples/authTest.cpp index 9243bbf9ef8..71cdd390cff 100644 --- a/client/examples/authTest.cpp +++ b/client/examples/authTest.cpp @@ -22,7 +22,7 @@ using namespace mongo; int main( int argc, const char **argv ) { - + const char *port = "27017"; if ( argc != 1 ) { if ( argc != 3 ) @@ -37,12 +37,13 @@ int main( int argc, const char **argv ) { throw -11; } - { // clean up old data from any previous tests + { + // clean up old data from any previous tests conn.remove( "test.system.users" , BSONObj() ); } conn.insert( "test.system.users" , BSON( "user" << "eliot" << "pwd" << conn.createPasswordDigest( "eliot" , "bar" ) ) ); - + errmsg.clear(); bool ok = conn.auth( "test" , "eliot" , "bar" , errmsg ); if ( ! ok ) diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp index 64450db3db6..bd4432eb0cf 100644 --- a/client/examples/clientTest.cpp +++ b/client/examples/clientTest.cpp @@ -130,12 +130,14 @@ int main( int argc, const char **argv ) { } - { // ensure index + { + // ensure index assert( conn.ensureIndex( ns , BSON( "name" << 1 ) ) ); assert( ! conn.ensureIndex( ns , BSON( "name" << 1 ) ) ); } - { // hint related tests + { + // hint related tests assert( conn.findOne(ns, "{}")["name"].str() == "sara" ); assert( conn.findOne(ns, "{ name : 'eliot' }")["name"].str() == "eliot" ); @@ -146,7 +148,7 @@ int main( int argc, const char **argv ) { try { conn.findOne(ns, Query("{name:\"eliot\"}").hint("{foo:1}")); } - catch ( ... ){ + catch ( ... ) { asserted = true; } assert( asserted ); @@ -158,7 +160,8 @@ int main( int argc, const char **argv ) { assert( conn.validate( ns ) ); } - { // timestamp test + { + // timestamp test const char * tsns = "test.tstest1"; conn.dropCollection( tsns ); @@ -190,32 +193,33 @@ int main( int argc, const char **argv ) { ( oldTime == found["ts"].timestampTime() && oldInc < found["ts"].timestampInc() ) ); } - - { // check that killcursors doesn't affect last error + + { + // check that killcursors doesn't affect last error assert( conn.getLastError().empty() ); - + BufBuilder b; b.appendNum( (int)0 ); // reserved b.appendNum( (int)-1 ); // invalid # of cursors triggers exception b.appendNum( (int)-1 ); // bogus cursor id - + Message m; m.setData( dbKillCursors, b.buf(), b.len() ); - + // say() is protected in DBClientConnection, so get superclass static_cast< DBConnector* >( &conn )->say( m ); - + assert( conn.getLastError().empty() ); } { list<string> l = conn.getDatabaseNames(); - for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ){ + for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ) { cout << "db name : " << *i << endl; } l = conn.getCollectionNames( "test" ); - for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ){ + for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ) { cout << "coll name : " << *i << endl; } } diff --git a/client/examples/first.cpp b/client/examples/first.cpp index f3b654fe686..ab5efb325f5 100644 --- a/client/examples/first.cpp +++ b/client/examples/first.cpp @@ -40,7 +40,7 @@ int main( int argc, const char **argv ) { throw -12; port = argv[ 2 ]; } - + mongo::DBClientConnection conn; string errmsg; if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { @@ -48,14 +48,15 @@ int main( int argc, const char **argv ) { throw -11; } - { // clean up old data from any previous tests + { + // clean up old data from any previous tests mongo::BSONObjBuilder query; conn.remove( "test.people" , query.obj() ); } insert( conn , "eliot" , 15 ); insert( conn , "sara" , 23 ); - + { mongo::BSONObjBuilder query; auto_ptr<mongo::DBClientCursor> cursor = conn.query( "test.people" , query.obj() ); @@ -66,14 +67,14 @@ int main( int argc, const char **argv ) { } } - + { mongo::BSONObjBuilder query; query.append( "name" , "eliot" ); mongo::BSONObj res = conn.findOne( "test.people" , query.obj() ); cout << res.isEmpty() << "\t" << res.jsonString() << endl; } - + { mongo::BSONObjBuilder query; query.append( "name" , "asd" ); diff --git a/client/examples/httpClientTest.cpp b/client/examples/httpClientTest.cpp index 89d5bec603a..4fa5fd8069f 100644 --- a/client/examples/httpClientTest.cpp +++ b/client/examples/httpClientTest.cpp @@ -23,7 +23,7 @@ using namespace mongo; int main( int argc, const char **argv ) { - + int port = 27017; if ( argc != 1 ) { if ( argc != 3 ) @@ -31,11 +31,11 @@ int main( int argc, const char **argv ) { port = atoi( argv[ 2 ] ); } port += 1000; - + stringstream ss; ss << "http://localhost:" << port << "/"; string url = ss.str(); - + cout << "[" << url << "]" << endl; HttpClient c; diff --git a/client/examples/rs.cpp b/client/examples/rs.cpp index 39cdac1cc5f..7813ec633ac 100644 --- a/client/examples/rs.cpp +++ b/client/examples/rs.cpp @@ -25,16 +25,16 @@ using namespace mongo; using namespace std; -int main( int argc , const char ** argv ){ +int main( int argc , const char ** argv ) { string errmsg; - ConnectionString cs = ConnectionString::parse( "foo/127.0.0.1" , errmsg ); - if ( ! cs.isValid() ){ + ConnectionString cs = ConnectionString::parse( "foo/127.0.0.1" , errmsg ); + if ( ! cs.isValid() ) { cout << "error parsing url: " << errmsg << endl; return 1; } - + DBClientReplicaSet * conn = (DBClientReplicaSet*)cs.connect( errmsg ); - if ( ! conn ){ + if ( ! conn ) { cout << "error connecting: " << errmsg << endl; return 2; } @@ -42,17 +42,17 @@ int main( int argc , const char ** argv ){ string collName = "test.rs1"; conn->dropCollection( collName ); - while ( true ){ + while ( true ) { try { conn->update( collName , BSONObj() , BSON( "$inc" << BSON( "x" << 1 ) ) , true ); cout << conn->findOne( collName , BSONObj() ) << endl; cout << "\t A" << conn->slaveConn().findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ) << endl; cout << "\t B " << conn->findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ) << endl; } - catch ( std::exception& e ){ + catch ( std::exception& e ) { cout << "ERROR: " << e.what() << endl; } sleepsecs( 1 ); } - + } diff --git a/client/examples/second.cpp b/client/examples/second.cpp index 68eafaa91eb..6cc2111580f 100644 --- a/client/examples/second.cpp +++ b/client/examples/second.cpp @@ -23,7 +23,7 @@ using namespace std; using namespace mongo; int main( int argc, const char **argv ) { - + const char *port = "27017"; if ( argc != 1 ) { if ( argc != 3 ) diff --git a/client/examples/tail.cpp b/client/examples/tail.cpp index 3738b4f1840..90e62d279c1 100644 --- a/client/examples/tail.cpp +++ b/client/examples/tail.cpp @@ -23,24 +23,24 @@ using namespace mongo; void tail(DBClientBase& conn, const char *ns) { - BSONElement lastId = minKey.firstElement(); - Query query = Query(); - - auto_ptr<DBClientCursor> c = - conn.query(ns, query, 0, 0, 0, QueryOption_CursorTailable); - - while( 1 ) { - if( !c->more() ) { - if( c->isDead() ) { - break; // we need to requery - } - - // all data (so far) exhausted, wait for more - sleepsecs(1); - continue; - } - BSONObj o = c->next(); - lastId = o["_id"]; - cout << o.toString() << endl; - } + BSONElement lastId = minKey.firstElement(); + Query query = Query(); + + auto_ptr<DBClientCursor> c = + conn.query(ns, query, 0, 0, 0, QueryOption_CursorTailable); + + while( 1 ) { + if( !c->more() ) { + if( c->isDead() ) { + break; // we need to requery + } + + // all data (so far) exhausted, wait for more + sleepsecs(1); + continue; + } + BSONObj o = c->next(); + lastId = o["_id"]; + cout << o.toString() << endl; + } } diff --git a/client/examples/tutorial.cpp b/client/examples/tutorial.cpp index 28e1b273370..3cdf3593cd8 100644 --- a/client/examples/tutorial.cpp +++ b/client/examples/tutorial.cpp @@ -23,45 +23,45 @@ using namespace mongo; void printIfAge(DBClientConnection& c, int age) { - auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", QUERY( "age" << age ).sort("name") ); - while( cursor->more() ) { - BSONObj p = cursor->next(); - cout << p.getStringField("name") << endl; - } + auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", QUERY( "age" << age ).sort("name") ); + while( cursor->more() ) { + BSONObj p = cursor->next(); + cout << p.getStringField("name") << endl; + } } void run() { - DBClientConnection c; - c.connect("localhost"); //"192.168.58.1"); - cout << "connected ok" << endl; - BSONObj p = BSON( "name" << "Joe" << "age" << 33 ); - c.insert("tutorial.persons", p); - p = BSON( "name" << "Jane" << "age" << 40 ); - c.insert("tutorial.persons", p); - p = BSON( "name" << "Abe" << "age" << 33 ); - c.insert("tutorial.persons", p); - p = BSON( "name" << "Samantha" << "age" << 21 << "city" << "Los Angeles" << "state" << "CA" ); - c.insert("tutorial.persons", p); + DBClientConnection c; + c.connect("localhost"); //"192.168.58.1"); + cout << "connected ok" << endl; + BSONObj p = BSON( "name" << "Joe" << "age" << 33 ); + c.insert("tutorial.persons", p); + p = BSON( "name" << "Jane" << "age" << 40 ); + c.insert("tutorial.persons", p); + p = BSON( "name" << "Abe" << "age" << 33 ); + c.insert("tutorial.persons", p); + p = BSON( "name" << "Samantha" << "age" << 21 << "city" << "Los Angeles" << "state" << "CA" ); + c.insert("tutorial.persons", p); - c.ensureIndex("tutorial.persons", fromjson("{age:1}")); + c.ensureIndex("tutorial.persons", fromjson("{age:1}")); - cout << "count:" << c.count("tutorial.persons") << endl; + cout << "count:" << c.count("tutorial.persons") << endl; - auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", BSONObj()); - while( cursor->more() ) { - cout << cursor->next().toString() << endl; - } + auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", BSONObj()); + while( cursor->more() ) { + cout << cursor->next().toString() << endl; + } - cout << "\nprintifage:\n"; - printIfAge(c, 33); + cout << "\nprintifage:\n"; + printIfAge(c, 33); } -int main() { - try { - run(); - } - catch( DBException &e ) { - cout << "caught " << e.what() << endl; - } - return 0; +int main() { + try { + run(); + } + catch( DBException &e ) { + cout << "caught " << e.what() << endl; + } + return 0; } diff --git a/client/examples/whereExample.cpp b/client/examples/whereExample.cpp index 69241b5f2d4..ce4174b1c71 100644 --- a/client/examples/whereExample.cpp +++ b/client/examples/whereExample.cpp @@ -23,7 +23,7 @@ using namespace std; using namespace mongo; int main( int argc, const char **argv ) { - + const char *port = "27017"; if ( argc != 1 ) { if ( argc != 3 ) @@ -36,7 +36,7 @@ int main( int argc, const char **argv ) { if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { cout << "couldn't connect : " << errmsg << endl; throw -11; - } + } const char * ns = "test.where"; @@ -44,9 +44,9 @@ int main( int argc, const char **argv ) { conn.insert( ns , BSON( "name" << "eliot" << "num" << 17 ) ); conn.insert( ns , BSON( "name" << "sara" << "num" << 24 ) ); - + auto_ptr<DBClientCursor> cursor = conn.query( ns , BSONObj() ); - + while ( cursor->more() ) { BSONObj obj = cursor->next(); cout << "\t" << obj.jsonString() << endl; diff --git a/client/gridfs.cpp b/client/gridfs.cpp index d740c765d7f..233724ae8e5 100644 --- a/client/gridfs.cpp +++ b/client/gridfs.cpp @@ -34,11 +34,11 @@ namespace mongo { const unsigned DEFAULT_CHUNK_SIZE = 256 * 1024; - GridFSChunk::GridFSChunk( BSONObj o ){ + GridFSChunk::GridFSChunk( BSONObj o ) { _data = o; } - GridFSChunk::GridFSChunk( BSONObj fileObject , int chunkNumber , const char * data , int len ){ + GridFSChunk::GridFSChunk( BSONObj fileObject , int chunkNumber , const char * data , int len ) { BSONObjBuilder b; b.appendAs( fileObject["_id"] , "files_id" ); b.append( "n" , chunkNumber ); @@ -47,7 +47,7 @@ namespace mongo { } - GridFS::GridFS( DBClientBase& client , const string& dbName , const string& prefix ) : _client( client ) , _dbName( dbName ) , _prefix( prefix ){ + GridFS::GridFS( DBClientBase& client , const string& dbName , const string& prefix ) : _client( client ) , _dbName( dbName ) , _prefix( prefix ) { _filesNS = dbName + "." + prefix + ".files"; _chunksNS = dbName + "." + prefix + ".chunks"; _chunkSize = DEFAULT_CHUNK_SIZE; @@ -56,7 +56,7 @@ namespace mongo { client.ensureIndex( _chunksNS , BSON( "files_id" << 1 << "n" << 1 ) ); } - GridFS::~GridFS(){ + GridFS::~GridFS() { } @@ -65,7 +65,7 @@ namespace mongo { _chunkSize = size; } - BSONObj GridFS::storeFile( const char* data , size_t length , const string& remoteName , const string& contentType){ + BSONObj GridFS::storeFile( const char* data , size_t length , const string& remoteName , const string& contentType) { char const * const end = data + length; OID id; @@ -73,7 +73,7 @@ namespace mongo { BSONObj idObj = BSON("_id" << id); int chunkNumber = 0; - while (data < end){ + while (data < end) { int chunkLen = MIN(_chunkSize, (unsigned)(end-data)); GridFSChunk c(idObj, chunkNumber, data, chunkLen); _client.insert( _chunksNS.c_str() , c._data ); @@ -86,7 +86,7 @@ namespace mongo { } - BSONObj GridFS::storeFile( const string& fileName , const string& remoteName , const string& contentType){ + BSONObj GridFS::storeFile( const string& fileName , const string& remoteName , const string& contentType) { uassert( 10012 , "file doesn't exist" , fileName == "-" || boost::filesystem::exists( fileName ) ); FILE* fd; @@ -102,12 +102,12 @@ namespace mongo { int chunkNumber = 0; gridfs_offset length = 0; - while (!feof(fd)){ + while (!feof(fd)) { //boost::scoped_array<char>buf (new char[_chunkSize+1]); char * buf = new char[_chunkSize+1]; char* bufPos = buf;//.get(); unsigned int chunkLen = 0; // how much in the chunk now - while(chunkLen != _chunkSize && !feof(fd)){ + while(chunkLen != _chunkSize && !feof(fd)) { int readLen = fread(bufPos, 1, _chunkSize - chunkLen, fd); chunkLen += readLen; bufPos += readLen; @@ -125,11 +125,11 @@ namespace mongo { if (fd != stdin) fclose( fd ); - + return insertFile((remoteName.empty() ? fileName : remoteName), id, length, contentType); } - BSONObj GridFS::insertFile(const string& name, const OID& id, gridfs_offset length, const string& contentType){ + BSONObj GridFS::insertFile(const string& name, const OID& id, gridfs_offset length, const string& contentType) { BSONObj res; if ( ! _client.runCommand( _dbName.c_str() , BSON( "filemd5" << id << "root" << _prefix ) , res ) ) @@ -143,9 +143,10 @@ namespace mongo { << "md5" << res["md5"] ; - if (length < 1024*1024*1024){ // 2^30 + if (length < 1024*1024*1024) { // 2^30 file << "length" << (int) length; - }else{ + } + else { file << "length" << (long long) length; } @@ -158,9 +159,9 @@ namespace mongo { return ret; } - void GridFS::removeFile( const string& fileName ){ + void GridFS::removeFile( const string& fileName ) { auto_ptr<DBClientCursor> files = _client.query( _filesNS , BSON( "filename" << fileName ) ); - while (files->more()){ + while (files->more()) { BSONObj file = files->next(); BSONElement id = file["_id"]; _client.remove( _filesNS.c_str() , BSON( "_id" << id ) ); @@ -168,38 +169,38 @@ namespace mongo { } } - GridFile::GridFile( GridFS * grid , BSONObj obj ){ + GridFile::GridFile( GridFS * grid , BSONObj obj ) { _grid = grid; _obj = obj; } - GridFile GridFS::findFile( const string& fileName ){ + GridFile GridFS::findFile( const string& fileName ) { return findFile( BSON( "filename" << fileName ) ); }; - GridFile GridFS::findFile( BSONObj query ){ + GridFile GridFS::findFile( BSONObj query ) { query = BSON("query" << query << "orderby" << BSON("uploadDate" << -1)); return GridFile( this , _client.findOne( _filesNS.c_str() , query ) ); } - auto_ptr<DBClientCursor> GridFS::list(){ + auto_ptr<DBClientCursor> GridFS::list() { return _client.query( _filesNS.c_str() , BSONObj() ); } - auto_ptr<DBClientCursor> GridFS::list( BSONObj o ){ + auto_ptr<DBClientCursor> GridFS::list( BSONObj o ) { return _client.query( _filesNS.c_str() , o ); } - BSONObj GridFile::getMetadata(){ + BSONObj GridFile::getMetadata() { BSONElement meta_element = _obj["metadata"]; - if( meta_element.eoo() ){ + if( meta_element.eoo() ) { return BSONObj(); } return meta_element.embeddedObject(); } - GridFSChunk GridFile::getChunk( int n ){ + GridFSChunk GridFile::getChunk( int n ) { _exists(); BSONObjBuilder b; b.appendAs( _obj["_id"] , "files_id" ); @@ -210,12 +211,12 @@ namespace mongo { return GridFSChunk(o); } - gridfs_offset GridFile::write( ostream & out ){ + gridfs_offset GridFile::write( ostream & out ) { _exists(); const int num = getNumChunks(); - for ( int i=0; i<num; i++ ){ + for ( int i=0; i<num; i++ ) { GridFSChunk c = getChunk( i ); int len; @@ -226,17 +227,18 @@ namespace mongo { return getContentLength(); } - gridfs_offset GridFile::write( const string& where ){ - if (where == "-"){ + gridfs_offset GridFile::write( const string& where ) { + if (where == "-") { return write( cout ); - } else { + } + else { ofstream out(where.c_str() , ios::out | ios::binary ); uassert(13325, "couldn't open file: " + where, out.is_open() ); return write( out ); } } - void GridFile::_exists(){ + void GridFile::_exists() { uassert( 10015 , "doesn't exists" , exists() ); } diff --git a/client/gridfs.h b/client/gridfs.h index b58cb76ab94..b52cf75117a 100644 --- a/client/gridfs.h +++ b/client/gridfs.h @@ -32,13 +32,13 @@ namespace mongo { GridFSChunk( BSONObj data ); GridFSChunk( BSONObj fileId , int chunkNumber , const char * data , int len ); - int len(){ + int len() { int len; _data["data"].binDataClean( len ); return len; } - const char * data( int & len ){ + const char * data( int & len ) { return _data["data"].binDataClean( len ); } @@ -140,41 +140,41 @@ namespace mongo { * @return whether or not this file exists * findFile will always return a GriFile, so need to check this */ - bool exists(){ + bool exists() { return ! _obj.isEmpty(); } - string getFilename(){ + string getFilename() { return _obj["filename"].str(); } - int getChunkSize(){ + int getChunkSize() { return (int)(_obj["chunkSize"].number()); } - gridfs_offset getContentLength(){ + gridfs_offset getContentLength() { return (gridfs_offset)(_obj["length"].number()); } - string getContentType(){ + string getContentType() { return _obj["contentType"].valuestr(); } - Date_t getUploadDate(){ + Date_t getUploadDate() { return _obj["uploadDate"].date(); } - string getMD5(){ + string getMD5() { return _obj["md5"].str(); } - BSONElement getFileField( const string& name ){ + BSONElement getFileField( const string& name ) { return _obj[name]; } BSONObj getMetadata(); - int getNumChunks(){ + int getNumChunks() { return (int) ceil( (double)getContentLength() / (double)getChunkSize() ); } diff --git a/client/model.cpp b/client/model.cpp index 7861b915696..bd10a3c5528 100644 --- a/client/model.cpp +++ b/client/model.cpp @@ -21,23 +21,23 @@ namespace mongo { - bool Model::load(BSONObj& query){ + bool Model::load(BSONObj& query) { ScopedDbConnection conn( modelServer() ); BSONObj b = conn->findOne(getNS(), query); conn.done(); - + if ( b.isEmpty() ) return false; - + unserialize(b); _id = b["_id"].wrap().getOwned(); return true; } - void Model::remove( bool safe ){ + void Model::remove( bool safe ) { uassert( 10016 , "_id isn't set - needed for remove()" , _id["_id"].type() ); - + ScopedDbConnection conn( modelServer() ); conn->remove( getNS() , _id ); @@ -46,34 +46,34 @@ namespace mongo { errmsg = conn->getLastError(); conn.done(); - + if ( safe && errmsg.size() ) throw UserException( 9002 , (string)"error on Model::remove: " + errmsg ); } - void Model::save( bool safe ){ + void Model::save( bool safe ) { ScopedDbConnection conn( modelServer() ); BSONObjBuilder b; serialize( b ); - + BSONElement myId; { BSONObjIterator i = b.iterator(); - while ( i.more() ){ + while ( i.more() ) { BSONElement e = i.next(); - if ( strcmp( e.fieldName() , "_id" ) == 0 ){ + if ( strcmp( e.fieldName() , "_id" ) == 0 ) { myId = e; break; } } } - if ( myId.type() ){ - if ( _id.isEmpty() ){ + if ( myId.type() ) { + if ( _id.isEmpty() ) { _id = myId.wrap(); } - else if ( myId.woCompare( _id.firstElement() ) ){ + else if ( myId.woCompare( _id.firstElement() ) ) { stringstream ss; ss << "_id from serialize and stored differ: "; ss << '[' << myId << "] != "; @@ -82,11 +82,11 @@ namespace mongo { } } - if ( _id.isEmpty() ){ + if ( _id.isEmpty() ) { OID oid; oid.init(); b.appendOID( "_id" , &oid ); - + BSONObj o = b.obj(); conn->insert( getNS() , o ); _id = o["_id"].wrap().getOwned(); @@ -94,25 +94,25 @@ namespace mongo { log(4) << "inserted new model " << getNS() << " " << o << endl; } else { - if ( myId.eoo() ){ + if ( myId.eoo() ) { myId = _id["_id"]; b.append( myId ); } - + assert( ! myId.eoo() ); BSONObjBuilder qb; qb.append( myId ); - + BSONObj q = qb.obj(); BSONObj o = b.obj(); log(4) << "updated model" << getNS() << " " << q << " " << o << endl; conn->update( getNS() , q , o , true ); - + } - + string errmsg = ""; if ( safe ) errmsg = conn->getLastError(); @@ -123,13 +123,13 @@ namespace mongo { throw UserException( 9003 , (string)"error on Model::save: " + errmsg ); } - BSONObj Model::toObject(){ + BSONObj Model::toObject() { BSONObjBuilder b; serialize( b ); return b.obj(); } - void Model::append( const char * name , BSONObjBuilder& b ){ + void Model::append( const char * name , BSONObjBuilder& b ) { BSONObjBuilder bb( b.subobjStart( name ) ); serialize( bb ); bb.done(); diff --git a/client/model.h b/client/model.h index 108efc06ba7..7dd31434f49 100644 --- a/client/model.h +++ b/client/model.h @@ -43,16 +43,16 @@ namespace mongo { virtual void unserialize(const BSONObj& from) = 0; virtual BSONObj toObject(); virtual void append( const char * name , BSONObjBuilder& b ); - + virtual string modelServer() = 0; - - /** Load a single object. + + /** Load a single object. @return true if successful. */ virtual bool load(BSONObj& query); virtual void save( bool safe=false ); virtual void remove( bool safe=false ); - + protected: BSONObj _id; }; diff --git a/client/mongo_client_lib.cpp b/client/mongo_client_lib.cpp index 626590dd1b6..69f801aac1d 100755..100644 --- a/client/mongo_client_lib.cpp +++ b/client/mongo_client_lib.cpp @@ -2,9 +2,9 @@ MongoDB C++ Driver - Normally one includes dbclient.h, and links against libmongoclient.a, when connecting to MongoDB - from C++. However, if you have a situation where the pre-built library does not work, you can use - this file instead to build all the necessary symbols. To do so, include client_lib.cpp in your + Normally one includes dbclient.h, and links against libmongoclient.a, when connecting to MongoDB + from C++. However, if you have a situation where the pre-built library does not work, you can use + this file instead to build all the necessary symbols. To do so, include client_lib.cpp in your project. For example, to build and run simple_client_demo.cpp with GCC and run it: @@ -30,7 +30,7 @@ #include "../util/md5main.cpp" -#define MONGO_EXPOSE_MACROS +#define MONGO_EXPOSE_MACROS #include "../pch.h" #include "../util/assert_util.cpp" @@ -60,7 +60,7 @@ #include "../db/nonce.cpp" #include "../db/commands.cpp" -extern "C" { +extern "C" { #include "../util/md5.c" } diff --git a/client/parallel.cpp b/client/parallel.cpp index 7649821ddc4..14f0fa450d2 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -25,10 +25,10 @@ #include "../s/shard.h" namespace mongo { - + // -------- ClusteredCursor ----------- - - ClusteredCursor::ClusteredCursor( QueryMessage& q ){ + + ClusteredCursor::ClusteredCursor( QueryMessage& q ) { _ns = q.ns; _query = q.query.copy(); _options = q.queryOptions; @@ -41,7 +41,7 @@ namespace mongo { _didInit = false; } - ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){ + ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ) { _ns = ns; _query = q.getOwned(); _options = options; @@ -52,50 +52,50 @@ namespace mongo { _didInit = false; } - ClusteredCursor::~ClusteredCursor(){ + ClusteredCursor::~ClusteredCursor() { _done = true; // just in case } - void ClusteredCursor::init(){ + void ClusteredCursor::init() { if ( _didInit ) return; _didInit = true; _init(); } - - auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ){ + + auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ) { uassert( 10017 , "cursor already done" , ! _done ); assert( _didInit ); - + BSONObj q = _query; - if ( ! extra.isEmpty() ){ + if ( ! extra.isEmpty() ) { q = concatQuery( q , extra ); } ShardConnection conn( server , _ns ); - - if ( conn.setVersion() ){ + + if ( conn.setVersion() ) { conn.done(); throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ); } - if ( logLevel >= 5 ){ - log(5) << "ClusteredCursor::query (" << type() << ") server:" << server - << " ns:" << _ns << " query:" << q << " num:" << num + if ( logLevel >= 5 ) { + log(5) << "ClusteredCursor::query (" << type() << ") server:" << server + << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl; } - - auto_ptr<DBClientCursor> cursor = + + auto_ptr<DBClientCursor> cursor = conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft ); assert( cursor.get() ); - - if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ){ + + if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) { conn.done(); throw StaleConfigException( _ns , "ClusteredCursor::query" ); } - - if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ){ + + if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) { conn.done(); BSONObj o = cursor->next(); throw UserException( o["code"].numberInt() , o["$err"].String() ); @@ -108,9 +108,9 @@ namespace mongo { return cursor; } - BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ){ + BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ) { BSONObj q = _query; - if ( ! extra.isEmpty() ){ + if ( ! extra.isEmpty() ) { q = concatQuery( q , extra ); } @@ -124,26 +124,26 @@ namespace mongo { return o; } - BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){ + BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ) { if ( ! query.hasField( "query" ) ) return _concatFilter( query , extraFilter ); BSONObjBuilder b; BSONObjIterator i( query ); - while ( i.more() ){ + while ( i.more() ) { BSONElement e = i.next(); - if ( strcmp( e.fieldName() , "query" ) ){ + if ( strcmp( e.fieldName() , "query" ) ) { b.append( e ); continue; } - + b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) ); } return b.obj(); } - - BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){ + + BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ) { BSONObjBuilder b; b.appendElements( filter ); b.appendElements( extra ); @@ -151,7 +151,7 @@ namespace mongo { // TODO: should do some simplification here if possibl ideally } - BSONObj ClusteredCursor::explain(){ + BSONObj ClusteredCursor::explain() { // Note: by default we filter out allPlans and oldPlan in the shell's // explain() function. If you add any recursive structures, make sure to // edit the JS to make sure everything gets filtered. @@ -167,25 +167,25 @@ namespace mongo { map<string,list<BSONObj> > out; { _explain( out ); - + BSONObjBuilder x( b.subobjStart( "shards" ) ); - for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ){ + for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ) { string shard = i->first; list<BSONObj> l = i->second; BSONArrayBuilder y( x.subarrayStart( shard ) ); - for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ){ + for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ) { BSONObj temp = *j; y.append( temp ); - + BSONObjIterator k( temp ); - while ( k.more() ){ + while ( k.more() ) { BSONElement z = k.next(); if ( z.fieldName()[0] != 'n' ) continue; long long& c = counters[z.fieldName()]; c += z.numberLong(); } - + millis += temp["millis"].numberLong(); numExplains++; } @@ -204,37 +204,37 @@ namespace mongo { return b.obj(); } - + // -------- FilteringClientCursor ----------- FilteringClientCursor::FilteringClientCursor( const BSONObj filter ) - : _matcher( filter ) , _done( true ){ + : _matcher( filter ) , _done( true ) { } FilteringClientCursor::FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter ) - : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ){ + : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ) { } - - FilteringClientCursor::~FilteringClientCursor(){ + + FilteringClientCursor::~FilteringClientCursor() { } - - void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ){ + + void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ) { _cursor = cursor; _next = BSONObj(); _done = _cursor.get() == 0; } - bool FilteringClientCursor::more(){ + bool FilteringClientCursor::more() { if ( ! _next.isEmpty() ) return true; - + if ( _done ) return false; - + _advance(); return ! _next.isEmpty(); } - - BSONObj FilteringClientCursor::next(){ + + BSONObj FilteringClientCursor::next() { assert( ! _next.isEmpty() ); assert( ! _done ); @@ -244,20 +244,20 @@ namespace mongo { return ret; } - BSONObj FilteringClientCursor::peek(){ + BSONObj FilteringClientCursor::peek() { if ( _next.isEmpty() ) _advance(); return _next; } - - void FilteringClientCursor::_advance(){ + + void FilteringClientCursor::_advance() { assert( _next.isEmpty() ); if ( ! _cursor.get() || _done ) return; - - while ( _cursor->more() ){ + + while ( _cursor->more() ) { _next = _cursor->next(); - if ( _matcher.matches( _next ) ){ + if ( _matcher.matches( _next ) ) { if ( ! _cursor->moreInCurrentBatch() ) _next = _next.getOwned(); return; @@ -266,53 +266,53 @@ namespace mongo { } _done = true; } - + // -------- SerialServerClusteredCursor ----------- - - SerialServerClusteredCursor::SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ){ + + SerialServerClusteredCursor::SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ) { for ( set<ServerAndQuery>::const_iterator i = servers.begin(); i!=servers.end(); i++ ) _servers.push_back( *i ); - + if ( sortOrder > 0 ) sort( _servers.begin() , _servers.end() ); else if ( sortOrder < 0 ) sort( _servers.rbegin() , _servers.rend() ); - + _serverIndex = 0; _needToSkip = q.ntoskip; } - - bool SerialServerClusteredCursor::more(){ - + + bool SerialServerClusteredCursor::more() { + // TODO: optimize this by sending on first query and then back counting // tricky in case where 1st server doesn't have any after // need it to send n skipped - while ( _needToSkip > 0 && _current.more() ){ + while ( _needToSkip > 0 && _current.more() ) { _current.next(); _needToSkip--; } - + if ( _current.more() ) return true; - - if ( _serverIndex >= _servers.size() ){ + + if ( _serverIndex >= _servers.size() ) { return false; } - + ServerAndQuery& sq = _servers[_serverIndex++]; _current.reset( query( sq._server , 0 , sq._extra ) ); return more(); } - - BSONObj SerialServerClusteredCursor::next(){ + + BSONObj SerialServerClusteredCursor::next() { uassert( 10018 , "no more items" , more() ); return _current.next(); } - void SerialServerClusteredCursor::_explain( map< string,list<BSONObj> >& out ){ - for ( unsigned i=0; i<_servers.size(); i++ ){ + void SerialServerClusteredCursor::_explain( map< string,list<BSONObj> >& out ) { + for ( unsigned i=0; i<_servers.size(); i++ ) { ServerAndQuery& sq = _servers[i]; list<BSONObj> & l = out[sq._server]; l.push_back( explain( sq._server , sq._extra ) ); @@ -320,29 +320,29 @@ namespace mongo { } // -------- ParallelSortClusteredCursor ----------- - - ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , - const BSONObj& sortKey ) - : ClusteredCursor( q ) , _servers( servers ){ + + ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , + const BSONObj& sortKey ) + : ClusteredCursor( q ) , _servers( servers ) { _sortKey = sortKey.getOwned(); _needToSkip = q.ntoskip; _finishCons(); } - ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , - const Query& q , - int options , const BSONObj& fields ) - : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){ + ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , + const Query& q , + int options , const BSONObj& fields ) + : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ) { _sortKey = q.getSort().copy(); _needToSkip = 0; _finishCons(); } - void ParallelSortClusteredCursor::_finishCons(){ + void ParallelSortClusteredCursor::_finishCons() { _numServers = _servers.size(); _cursors = 0; - if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ){ + if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ) { // we need to make sure the sort key is in the project set<string> sortKeyFields; @@ -352,7 +352,7 @@ namespace mongo { bool isNegative = false; { BSONObjIterator i( _fields ); - while ( i.more() ){ + while ( i.more() ) { BSONElement e = i.next(); b.append( e ); @@ -368,93 +368,94 @@ namespace mongo { if ( ! e.trueValue() ) { uassert( 13431 , "have to have sort key in projection and removing it" , !found && begin == end ); - } else if (!e.isABSONObj()) { + } + else if (!e.isABSONObj()) { isNegative = true; } } - } - - if (isNegative){ - for (set<string>::const_iterator it(sortKeyFields.begin()), end(sortKeyFields.end()); it != end; ++it){ + } + + if (isNegative) { + for (set<string>::const_iterator it(sortKeyFields.begin()), end(sortKeyFields.end()); it != end; ++it) { b.append(*it, 1); } } - + _fields = b.obj(); } } - - void ParallelSortClusteredCursor::_init(){ + + void ParallelSortClusteredCursor::_init() { assert( ! _cursors ); _cursors = new FilteringClientCursor[_numServers]; - + // TODO: parellize int num = 0; - for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ){ + for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ) { const ServerAndQuery& sq = *i; _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) ); } - + } - - ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){ + + ParallelSortClusteredCursor::~ParallelSortClusteredCursor() { delete [] _cursors; _cursors = 0; } - bool ParallelSortClusteredCursor::more(){ + bool ParallelSortClusteredCursor::more() { - if ( _needToSkip > 0 ){ + if ( _needToSkip > 0 ) { int n = _needToSkip; _needToSkip = 0; - while ( n > 0 && more() ){ + while ( n > 0 && more() ) { BSONObj x = next(); n--; } _needToSkip = n; } - - for ( int i=0; i<_numServers; i++ ){ + + for ( int i=0; i<_numServers; i++ ) { if ( _cursors[i].more() ) return true; } return false; } - - BSONObj ParallelSortClusteredCursor::next(){ + + BSONObj ParallelSortClusteredCursor::next() { BSONObj best = BSONObj(); int bestFrom = -1; - - for ( int i=0; i<_numServers; i++){ + + for ( int i=0; i<_numServers; i++) { if ( ! _cursors[i].more() ) continue; - + BSONObj me = _cursors[i].peek(); - if ( best.isEmpty() ){ + if ( best.isEmpty() ) { best = me; bestFrom = i; continue; } - + int comp = best.woSortOrder( me , _sortKey , true ); if ( comp < 0 ) continue; - + best = me; bestFrom = i; } - + uassert( 10019 , "no more elements" , ! best.isEmpty() ); _cursors[bestFrom].next(); - + return best; } - void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ){ - for ( set<ServerAndQuery>::iterator i=_servers.begin(); i!=_servers.end(); ++i ){ + void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ) { + for ( set<ServerAndQuery>::iterator i=_servers.begin(); i!=_servers.end(); ++i ) { const ServerAndQuery& sq = *i; list<BSONObj> & l = out[sq._server]; l.push_back( explain( sq._server , sq._extra ) ); @@ -466,39 +467,39 @@ namespace mongo { // ---- Future ----- // ----------------- - Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd ){ + Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd ) { _server = server; _db = db; _cmd = cmd; _done = false; } - bool Future::CommandResult::join(){ + bool Future::CommandResult::join() { _thr->join(); assert( _done ); return _ok; } - void Future::commandThread(shared_ptr<CommandResult> res){ + void Future::commandThread(shared_ptr<CommandResult> res) { setThreadName( "future" ); - + try { ScopedDbConnection conn( res->_server ); res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res ); conn.done(); } - catch ( std::exception& e ){ + catch ( std::exception& e ) { error() << "Future::commandThread exception: " << e.what() << endl; res->_ok = false; } res->_done = true; } - shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ){ + shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ) { shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd )); res->_thr.reset( new boost::thread( boost::bind(Future::commandThread, res) ) ); - + return res; } - + } diff --git a/client/parallel.h b/client/parallel.h index b131ac57f3e..e993e7cffa2 100644 --- a/client/parallel.h +++ b/client/parallel.h @@ -33,14 +33,14 @@ namespace mongo { */ class ServerAndQuery { public: - ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : - _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ + ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : + _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) { } - bool operator<( const ServerAndQuery& other ) const{ + bool operator<( const ServerAndQuery& other ) const { if ( ! _orderObject.isEmpty() ) return _orderObject.woCompare( other._orderObject ) < 0; - + if ( _server < other._server ) return true; if ( other._server > _server ) @@ -72,28 +72,28 @@ namespace mongo { ClusteredCursor( QueryMessage& q ); ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() ); virtual ~ClusteredCursor(); - + /** call before using */ void init(); - + virtual bool more() = 0; virtual BSONObj next() = 0; - + static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter ); - + virtual string type() const = 0; virtual BSONObj explain(); protected: - + virtual void _init() = 0; auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 ); BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() ); - + static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); - + virtual void _explain( map< string,list<BSONObj> >& out ) = 0; string _ns; @@ -113,19 +113,19 @@ namespace mongo { FilteringClientCursor( const BSONObj filter = BSONObj() ); FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() ); ~FilteringClientCursor(); - + void reset( auto_ptr<DBClientCursor> cursor ); - + bool more(); BSONObj next(); - + BSONObj peek(); private: void _advance(); - + Matcher _matcher; auto_ptr<DBClientCursor> _cursor; - + BSONObj _next; bool _done; }; @@ -133,22 +133,22 @@ namespace mongo { class Servers { public: - Servers(){ + Servers() { } - - void add( const ServerAndQuery& s ){ + + void add( const ServerAndQuery& s ) { add( s._server , s._extra ); } - - void add( const string& server , const BSONObj& filter ){ + + void add( const string& server , const BSONObj& filter ) { vector<BSONObj>& mine = _filters[server]; mine.push_back( filter.getOwned() ); } - + // TOOO: pick a less horrible name class View { - View( const Servers* s ){ - for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ){ + View( const Servers* s ) { + for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ) { _servers.push_back( i->first ); _filters.push_back( i->second ); } @@ -165,7 +165,7 @@ namespace mongo { vector<BSONObj> getFilter( int n ) const { return _filters[ n ]; } - + private: vector<string> _servers; vector< vector<BSONObj> > _filters; @@ -176,7 +176,7 @@ namespace mongo { View view() const { return View( this ); } - + private: map<string, vector<BSONObj> > _filters; @@ -199,13 +199,13 @@ namespace mongo { protected: virtual void _explain( map< string,list<BSONObj> >& out ); - void _init(){} + void _init() {} vector<ServerAndQuery> _servers; unsigned _serverIndex; - + FilteringClientCursor _current; - + int _needToSkip; }; @@ -213,11 +213,11 @@ namespace mongo { /** * runs a query in parellel across N servers * sots - */ + */ class ParallelSortClusteredCursor : public ClusteredCursor { public: ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , const BSONObj& sortKey ); - ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , + ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , const Query& q , int options=0, const BSONObj& fields=BSONObj() ); virtual ~ParallelSortClusteredCursor(); virtual bool more(); @@ -232,7 +232,7 @@ namespace mongo { int _numServers; set<ServerAndQuery> _servers; BSONObj _sortKey; - + FilteringClientCursor * _cursors; int _needToSkip; }; @@ -246,11 +246,11 @@ namespace mongo { public: class CommandResult { public: - + string getServer() const { return _server; } bool isDone() const { return _done; } - + bool ok() const { assert( _done ); return _ok; @@ -266,30 +266,30 @@ namespace mongo { returns ok() */ bool join(); - + private: - + CommandResult( const string& server , const string& db , const BSONObj& cmd ); - + string _server; string _db; BSONObj _cmd; scoped_ptr<boost::thread> _thr; - + BSONObj _res; bool _ok; bool _done; - + friend class Future; }; - + static void commandThread(shared_ptr<CommandResult> res); - + static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd ); }; - + } #include "undef_macros.h" diff --git a/client/simple_client_demo.cpp b/client/simple_client_demo.cpp index f0c62fba46a..fa2f4a8ae11 100755..100644 --- a/client/simple_client_demo.cpp +++ b/client/simple_client_demo.cpp @@ -1,5 +1,5 @@ /* simple_client_demo.cpp - + See also : http://www.mongodb.org/pages/viewpage.action?pageId=133415 How to build and run: @@ -20,17 +20,17 @@ using namespace std; using namespace mongo; using namespace bson; -int main() { - cout << "connecting to localhost..." << endl; - DBClientConnection c; - c.connect("localhost"); - cout << "connected ok" << endl; - unsigned long long count = c.count("test.foo"); - cout << "count of exiting documents in collection test.foo : " << count << endl; +int main() { + cout << "connecting to localhost..." << endl; + DBClientConnection c; + c.connect("localhost"); + cout << "connected ok" << endl; + unsigned long long count = c.count("test.foo"); + cout << "count of exiting documents in collection test.foo : " << count << endl; - bo o = BSON( "hello" << "world" ); - c.insert("test.foo", o); + bo o = BSON( "hello" << "world" ); + c.insert("test.foo", o); - return 0; + return 0; } diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp index a16fa8e35eb..4277a1a0bef 100644 --- a/client/syncclusterconnection.cpp +++ b/client/syncclusterconnection.cpp @@ -37,11 +37,11 @@ namespace mongo { for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) _connect( i->toString() ); } - + SyncClusterConnection::SyncClusterConnection( string commaSeperated ) : _mutex("SyncClusterConnection") { _address = commaSeperated; string::size_type idx; - while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){ + while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ) { string h = commaSeperated.substr( 0 , idx ); commaSeperated = commaSeperated.substr( idx + 1 ); _connect( h ); @@ -50,7 +50,7 @@ namespace mongo { uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 ); } - SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") { + SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") { _address = a + "," + b + "," + c; // connect to all even if not working _connect( a ); @@ -62,30 +62,30 @@ namespace mongo { assert(0); } - SyncClusterConnection::~SyncClusterConnection(){ + SyncClusterConnection::~SyncClusterConnection() { for ( size_t i=0; i<_conns.size(); i++ ) delete _conns[i]; _conns.clear(); } - bool SyncClusterConnection::prepare( string& errmsg ){ + bool SyncClusterConnection::prepare( string& errmsg ) { _lastErrors.clear(); return fsync( errmsg ); } - - bool SyncClusterConnection::fsync( string& errmsg ){ + + bool SyncClusterConnection::fsync( string& errmsg ) { bool ok = true; errmsg = ""; - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { BSONObj res; try { if ( _conns[i]->simpleCommand( "admin" , 0 , "fsync" ) ) continue; } - catch ( std::exception& e ){ + catch ( std::exception& e ) { errmsg += e.what(); } - catch ( ... ){ + catch ( ... ) { } ok = false; errmsg += _conns[i]->toString() + ":" + res.toString(); @@ -93,21 +93,21 @@ namespace mongo { return ok; } - void SyncClusterConnection::_checkLast(){ + void SyncClusterConnection::_checkLast() { _lastErrors.clear(); vector<string> errors; - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { BSONObj res; string err; try { if ( ! _conns[i]->runCommand( "admin" , BSON( "getlasterror" << 1 << "fsync" << 1 ) , res ) ) err = "cmd failed: "; } - catch ( std::exception& e ){ + catch ( std::exception& e ) { err += e.what(); } - catch ( ... ){ + catch ( ... ) { err += "unknown failure"; } _lastErrors.push_back( res.getOwned() ); @@ -115,11 +115,11 @@ namespace mongo { } assert( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() ); - + stringstream err; bool ok = true; - - for ( size_t i = 0; i<_conns.size(); i++ ){ + + for ( size_t i = 0; i<_conns.size(); i++ ) { BSONObj res = _lastErrors[i]; if ( res["ok"].trueValue() && res["fsyncFiles"].numberInt() > 0 ) continue; @@ -132,13 +132,13 @@ namespace mongo { throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() ); } - BSONObj SyncClusterConnection::getLastErrorDetailed(){ + BSONObj SyncClusterConnection::getLastErrorDetailed() { if ( _lastErrors.size() ) return _lastErrors[0]; return DBClientBase::getLastErrorDetailed(); } - void SyncClusterConnection::_connect( string host ){ + void SyncClusterConnection::_connect( string host ) { log() << "SyncClusterConnection connecting to [" << host << "]" << endl; DBClientConnection * c = new DBClientConnection( true ); string errmsg; @@ -148,31 +148,31 @@ namespace mongo { _conns.push_back( c ); } - bool SyncClusterConnection::callRead( Message& toSend , Message& response ){ + bool SyncClusterConnection::callRead( Message& toSend , Message& response ) { // TODO: need to save state of which one to go back to somehow... return _conns[0]->callRead( toSend , response ); } BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { - - if ( ns.find( ".$cmd" ) != string::npos ){ + + if ( ns.find( ".$cmd" ) != string::npos ) { string cmdName = query.obj.firstElement().fieldName(); int lockType = _lockType( cmdName ); - if ( lockType > 0 ){ // write $cmd + if ( lockType > 0 ) { // write $cmd string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 13104 , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg ); - + vector<BSONObj> all; - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() ); } - + _checkLast(); - - for ( size_t i=0; i<all.size(); i++ ){ + + for ( size_t i=0; i<all.size(); i++ ) { BSONObj temp = all[i]; if ( isOk( temp ) ) continue; @@ -181,7 +181,7 @@ namespace mongo { ss << " " << _conns[i]->toString(); throw UserException( 13105 , ss.str() ); } - + return all[0]; } } @@ -191,9 +191,9 @@ namespace mongo { auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, - const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){ + const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { _lastErrors.clear(); - if ( ns.find( ".$cmd" ) != string::npos ){ + if ( ns.find( ".$cmd" ) != string::npos ) { string cmdName = query.obj.firstElement().fieldName(); int lockType = _lockType( cmdName ); uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 ); @@ -202,7 +202,7 @@ namespace mongo { return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); } - bool SyncClusterConnection::_commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options ){ + bool SyncClusterConnection::_commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options ) { auto_ptr<DBClientCursor> cursor = _queryOnActive( dbname + ".$cmd" , cmd , 1 , 0 , 0 , options , 0 ); if ( cursor->more() ) info = cursor->next().copy(); @@ -210,151 +210,151 @@ namespace mongo { info = BSONObj(); return isOk( info ); } - + auto_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip, - const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){ - - for ( size_t i=0; i<_conns.size(); i++ ){ + const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { + + for ( size_t i=0; i<_conns.size(); i++ ) { try { - auto_ptr<DBClientCursor> cursor = + auto_ptr<DBClientCursor> cursor = _conns[i]->query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); if ( cursor.get() ) return cursor; log() << "query failed to: " << _conns[i]->toString() << " no data" << endl; } - catch ( ... ){ + catch ( ... ) { log() << "query failed to: " << _conns[i]->toString() << " exception" << endl; } } throw UserException( 8002 , "all servers down!" ); } - - auto_ptr<DBClientCursor> SyncClusterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ){ - uassert( 10022 , "SyncClusterConnection::getMore not supported yet" , 0); + + auto_ptr<DBClientCursor> SyncClusterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ) { + uassert( 10022 , "SyncClusterConnection::getMore not supported yet" , 0); auto_ptr<DBClientCursor> c; return c; } - - void SyncClusterConnection::insert( const string &ns, BSONObj obj ){ - uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() , + void SyncClusterConnection::insert( const string &ns, BSONObj obj ) { + + uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() , ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() ); - + string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg ); - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { _conns[i]->insert( ns , obj ); } - + _checkLast(); } - - void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v ){ - uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0); + + void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v ) { + uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0); } - void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){ + void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ) { string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg ); - - for ( size_t i=0; i<_conns.size(); i++ ){ + + for ( size_t i=0; i<_conns.size(); i++ ) { _conns[i]->remove( ns , query , justOne ); } - + _checkLast(); } - void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ + void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ) { - if ( upsert ){ + if ( upsert ) { uassert( 13120 , "SyncClusterConnection::update upsert query needs _id" , query.obj["_id"].type() ); } - if ( _writeConcern ){ + if ( _writeConcern ) { string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg ); } - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { try { _conns[i]->update( ns , query , obj , upsert , multi ); } - catch ( std::exception& e ){ + catch ( std::exception& e ) { if ( _writeConcern ) throw e; } } - - if ( _writeConcern ){ + + if ( _writeConcern ) { _checkLast(); assert( _lastErrors.size() > 1 ); - + int a = _lastErrors[0]["n"].numberInt(); - for ( unsigned i=1; i<_lastErrors.size(); i++ ){ + for ( unsigned i=1; i<_lastErrors.size(); i++ ) { int b = _lastErrors[i]["n"].numberInt(); if ( a == b ) continue; - + throw UpdateNotTheSame( 8017 , "update not consistent" , _connAddresses , _lastErrors ); } } } - string SyncClusterConnection::_toString() const { + string SyncClusterConnection::_toString() const { stringstream ss; ss << "SyncClusterConnection [" << _address << "]"; return ss.str(); } - bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk ){ - uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" , + bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk ) { + uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" , toSend.operation() == dbQuery ); - + DbMessage d( toSend ); uassert( 8007 , "SyncClusterConnection::call can't handle $cmd" , strstr( d.getns(), "$cmd" ) == 0 ); - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { try { bool ok = _conns[i]->call( toSend , response , assertOk ); if ( ok ) return ok; log() << "call failed to: " << _conns[i]->toString() << " no data" << endl; } - catch ( ... ){ + catch ( ... ) { log() << "call failed to: " << _conns[i]->toString() << " exception" << endl; } } throw UserException( 8008 , "all servers down!" ); } - - void SyncClusterConnection::say( Message &toSend ){ + + void SyncClusterConnection::say( Message &toSend ) { string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg ); - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { _conns[i]->say( toSend ); } - + _checkLast(); } - - void SyncClusterConnection::sayPiggyBack( Message &toSend ){ + + void SyncClusterConnection::sayPiggyBack( Message &toSend ) { assert(0); } - int SyncClusterConnection::_lockType( const string& name ){ + int SyncClusterConnection::_lockType( const string& name ) { { scoped_lock lk(_mutex); map<string,int>::iterator i = _lockTypes.find( name ); if ( i != _lockTypes.end() ) return i->second; } - + BSONObj info; uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) ); @@ -365,7 +365,7 @@ namespace mongo { return lockType; } - void SyncClusterConnection::killCursor( long long cursorID ){ + void SyncClusterConnection::killCursor( long long cursorID ) { // should never need to do this assert(0); } diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h index 1a8171716df..7d2c2abfe36 100644 --- a/client/syncclusterconnection.h +++ b/client/syncclusterconnection.h @@ -27,15 +27,15 @@ namespace mongo { /** * This is a connection to a cluster of servers that operate as one * for super high durability. - * + * * Write operations are two-phase. First, all nodes are asked to fsync. If successful - * everywhere, the write is sent everywhere and then followed by an fsync. There is no - * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs, + * everywhere, the write is sent everywhere and then followed by an fsync. There is no + * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs, * these operations will be quite slow -- use sparingly. - * + * * Read operations are sent to a single random node. - * - * The class checks if a command is read or write style, and sends to a single + * + * The class checks if a command is read or write style, and sends to a single * node if a read lock command and to all in two phases with a write style command. */ class SyncClusterConnection : public DBClientBase { @@ -47,7 +47,7 @@ namespace mongo { SyncClusterConnection( string commaSeparated ); SyncClusterConnection( string a , string b , string c ); ~SyncClusterConnection(); - + /** * @return true if all servers are up and ready for writes */ @@ -66,9 +66,9 @@ namespace mongo { const BSONObj *fieldsToReturn, int queryOptions, int batchSize ); virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn, int options ); - + virtual void insert( const string &ns, BSONObj obj ); - + virtual void insert( const string &ns, const vector< BSONObj >& v ); virtual void remove( const string &ns , Query query, bool justOne ); @@ -80,20 +80,20 @@ namespace mongo { virtual void sayPiggyBack( Message &toSend ); virtual void killCursor( long long cursorID ); - + virtual string getServerAddress() const { return _address; } virtual bool isFailed() const { return false; } virtual string toString() { return _toString(); } - virtual BSONObj getLastErrorDetailed(); + virtual BSONObj getLastErrorDetailed(); virtual bool callRead( Message& toSend , Message& response ); - virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; } + virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; } private: SyncClusterConnection( SyncClusterConnection& prev ); - string _toString() const; + string _toString() const; bool _commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); auto_ptr<DBClientCursor> _queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, int batchSize ); @@ -106,17 +106,17 @@ namespace mongo { vector<DBClientConnection*> _conns; map<string,int> _lockTypes; mongo::mutex _mutex; - + vector<BSONObj> _lastErrors; }; - + class UpdateNotTheSame : public UserException { public: UpdateNotTheSame( int code , const string& msg , const vector<string>& addrs , const vector<BSONObj>& lastErrors ) - : UserException( code , msg ) , _addrs( addrs ) , _lastErrors( lastErrors ){ + : UserException( code , msg ) , _addrs( addrs ) , _lastErrors( lastErrors ) { assert( _addrs.size() == _lastErrors.size() ); } - + virtual ~UpdateNotTheSame() throw() { } @@ -133,7 +133,7 @@ namespace mongo { vector<string> _addrs; vector<BSONObj> _lastErrors; }; - + }; #include "undef_macros.h" |