// client.cpp /** * Copyright (C) 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ /* Client represents a connection to the database (the server-side) and corresponds to an open socket (or logical connection if pooling on sockets) from a client. */ #include "mongo/pch.h" #include "mongo/db/client.h" #include #include #include "mongo/base/status.h" #include "mongo/bson/mutable/document.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/authz_session_external_state_d.h" #include "mongo/db/auth/privilege.h" #include "mongo/db/db.h" #include "mongo/db/commands.h" #include "mongo/db/curop.h" #include "mongo/db/kill_current_op.h" #include "mongo/db/dbwebserver.h" #include "mongo/db/instance.h" #include "mongo/db/json.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/rs.h" #include "mongo/db/storage_options.h" #include "mongo/s/chunk_version.h" #include "mongo/s/d_logic.h" #include "mongo/s/stale_exception.h" // for SendStaleConfigException #include "mongo/scripting/engine.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/file_allocator.h" #include "mongo/util/mongoutils/checksum.h" #include "mongo/util/mongoutils/str.h" namespace mongo { mongo::mutex& Client::clientsMutex = *(new mutex("clientsMutex")); set& Client::clients = *(new set); // always be in clientsMutex when manipulating this TSP_DEFINE(Client, currentClient) /* each thread which does db operations has a Client object in TLS. call this when your thread starts. */ Client& Client::initThread(const char *desc, AbstractMessagingPort *mp) { verify( currentClient.get() == 0 ); string fullDesc = desc; if ( str::equals( "conn" , desc ) && mp != NULL ) fullDesc = str::stream() << desc << mp->connectionId(); setThreadName( fullDesc.c_str() ); // Create the client obj, attach to thread Client *c = new Client( fullDesc, mp ); currentClient.reset(c); mongo::lastError.initThread(); c->setAuthorizationSession(new AuthorizationSession(new AuthzSessionExternalStateMongod( getGlobalAuthorizationManager()))); return *c; } Client::Client(const string& desc, AbstractMessagingPort *p) : ClientBasic(p), _context(0), _shutdown(false), _desc(desc), _god(0), _lastOp(0) { _hasWrittenSinceCheckpoint = false; _connectionId = p ? p->connectionId() : 0; _curOp = new CurOp( this ); #ifndef _WIN32 stringstream temp; temp << hex << showbase << pthread_self(); _threadId = temp.str(); #endif scoped_lock bl(clientsMutex); clients.insert(this); } Client::~Client() { _god = 0; // Because both Client object pointers and logging infrastructure are stored in Thread // Specific Pointers and because we do not explicitly control the order in which TSPs are // deleted, it is possible for the logging infrastructure to have been deleted before // this code runs. This leads to segfaults (access violations) if this code attempts // to log anything. Therefore, disable logging from this destructor until this is fixed. // TODO(tad) Force the logging infrastructure to be the last TSP to be deleted for each // thread and reenable this code once that is done. #if 0 if ( _context ) error() << "Client::~Client _context should be null but is not; client:" << _desc << endl; if ( ! _shutdown ) { error() << "Client::shutdown not called: " << _desc << endl; } #endif if ( ! inShutdown() ) { // we can't clean up safely once we're in shutdown { scoped_lock bl(clientsMutex); if ( ! _shutdown ) clients.erase(this); } CurOp* last; do { last = _curOp; delete _curOp; // _curOp may have been reset to _curOp->_wrapped } while (_curOp != last); } } bool Client::shutdown() { _shutdown = true; if ( inShutdown() ) return false; { scoped_lock bl(clientsMutex); clients.erase(this); } return false; } BSONObj CachedBSONObjBase::_tooBig = fromjson("{\"$msg\":\"query not recording (too large)\"}"); Client::Context::Context(const std::string& ns , Database * db) : _client( currentClient.get() ), _oldContext( _client->_context ), _path(storageGlobalParams.dbpath), // is this right? could be a different db? // may need a dassert for this _justCreated(false), _doVersion( true ), _ns( ns ), _db(db) { _client->_context = this; } Client::Context::Context(const string& ns, const std::string& path, bool doVersion) : _client( currentClient.get() ), _oldContext( _client->_context ), _path( path ), _justCreated(false), // set for real in finishInit _doVersion(doVersion), _ns( ns ), _db(0) { _finishInit(); } /** "read lock, and set my context, all in one operation" * This handles (if not recursively locked) opening an unopened database. */ Client::ReadContext::ReadContext(const string& ns, const std::string& path, bool doVersion) { { lk.reset( new Lock::DBRead(ns) ); Database *db = dbHolder().get(ns, path); if( db ) { c.reset( new Context(path, ns, db, doVersion) ); return; } } // we usually don't get here, so doesn't matter how fast this part is { DEV log() << "_DEBUG ReadContext db wasn't open, will try to open " << ns << endl; if( Lock::isW() ) { // write locked already DEV RARELY log() << "write locked on ReadContext construction " << ns << endl; c.reset(new Context(ns, path, doVersion)); } else if( !Lock::nested() ) { lk.reset(0); { Lock::GlobalWrite w; Context c(ns, path, doVersion); } // db could be closed at this interim point -- that is ok, we will throw, and don't mind throwing. lk.reset( new Lock::DBRead(ns) ); c.reset(new Context(ns, path, doVersion)); } else { uasserted(15928, str::stream() << "can't open a database from a nested read lock " << ns); } } // todo: are receipts of thousands of queries for a nonexisting database a potential // cause of bad performance due to the write lock acquisition above? let's fix that. // it would be easy to first check that there is at least a .ns file, or something similar. } Client::WriteContext::WriteContext(const string& ns, const std::string& path, bool doVersion) : _lk( ns ) , _c(ns, path, doVersion) { } void Client::Context::checkNotStale() const { switch ( _client->_curOp->getOp() ) { case dbGetMore: // getMore's are special and should be handled else where case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well case dbDelete: break; default: { string errmsg; ChunkVersion received; ChunkVersion wanted; if ( ! shardVersionOk( _ns , errmsg, received, wanted ) ) { ostringstream os; os << "[" << _ns << "] shard version not ok in Client::Context: " << errmsg; throw SendStaleConfigException( _ns, os.str(), received, wanted ); } } } } // invoked from ReadContext Client::Context::Context(const string& path, const string& ns, Database *db, bool doVersion) : _client( currentClient.get() ), _oldContext( _client->_context ), _path( path ), _justCreated(false), _doVersion( doVersion ), _ns( ns ), _db(db) { verify(_db); checkNotStale(); _client->_context = this; _client->_curOp->enter( this ); } void Client::Context::_finishInit() { dassert( Lock::isLocked() ); int writeLocked = Lock::somethingWriteLocked(); if ( writeLocked && FileAllocator::get()->hasFailed() ) { uassert(14031, "Can't take a write lock while out of disk space", false); } _db = dbHolderUnchecked().getOrCreate( _ns , _path , _justCreated ); verify(_db); if( _doVersion ) checkNotStale(); massert( 16107 , str::stream() << "Don't have a lock on: " << _ns , Lock::atLeastReadLocked( _ns ) ); _client->_context = this; _client->_curOp->enter( this ); } Client::Context::~Context() { DEV verify( _client == currentClient.get() ); _client->_curOp->recordGlobalTime( _timer.micros() ); _client->_context = _oldContext; // note: _oldContext may be null } bool Client::Context::inDB( const string& db , const string& path ) const { if ( _path != path ) return false; if ( db == _ns ) return true; string::size_type idx = _ns.find( db ); if ( idx != 0 ) return false; return _ns[db.size()] == '.'; } void Client::appendLastOp( BSONObjBuilder& b ) const { // _lastOp is never set if replication is off if (repl::theReplSet || !_lastOp.isNull()) { b.appendTimestamp( "lastOp" , _lastOp.asDate() ); } } string Client::clientAddress(bool includePort) const { if( _curOp ) return _curOp->getRemoteString(includePort); return ""; } string Client::toString() const { stringstream ss; if ( _curOp ) ss << _curOp->info().jsonString(); return ss.str(); } string sayClientState() { Client* c = currentClient.get(); if ( !c ) return "no client"; return c->toString(); } bool Client::gotHandshake( const BSONObj& o ) { BSONObjIterator i(o); { BSONElement id = i.next(); verify( id.type() ); _remoteId = id.wrap( "_id" ); } BSONObjBuilder b; while ( i.more() ) b.append( i.next() ); b.appendElementsUnique( _handshake ); _handshake = b.obj(); if (!repl::theReplSet || !o.hasField("member")) { return false; } return repl::theReplSet->registerSlave(_remoteId, o["member"].Int()); } bool ClientBasic::hasCurrent() { return currentClient.get(); } ClientBasic* ClientBasic::getCurrent() { return currentClient.get(); } class HandshakeCmd : public Command { public: void help(stringstream& h) const { h << "internal"; } HandshakeCmd() : Command( "handshake" ) {} virtual bool isWriteCommandForConfigServer() const { return false; } virtual bool slaveOk() const { return true; } virtual bool adminOnly() const { return false; } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::internal); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } virtual bool run(OperationContext* txn, const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { Client& c = cc(); c.gotHandshake( cmdObj ); return 1; } } handshakeCmd; int Client::recommendedYieldMicros( int * writers , int * readers, bool needExact ) { int num = 0; int w = 0; int r = 0; { scoped_lock bl(clientsMutex); for ( set::iterator i=clients.begin(); i!=clients.end(); ++i ) { Client* c = *i; if ( c->lockState().hasLockPending() ) { num++; if ( c->lockState().hasAnyWriteLock() ) w++; else r++; } if (num > 100 && !needExact) break; } } if ( writers ) *writers = w; if ( readers ) *readers = r; int time = r * 10; // we have to be nice to readers since they don't have priority time += w; // writers are greedy, so we can be mean tot hem time = min( time , 1000000 ); return time; } int Client::getActiveClientCount( int& writers, int& readers ) { writers = 0; readers = 0; scoped_lock bl(clientsMutex); for ( set::iterator i=clients.begin(); i!=clients.end(); ++i ) { Client* c = *i; if ( ! c->curop()->active() ) continue; if ( c->lockState().hasAnyWriteLock() ) writers++; if ( c->lockState().hasAnyReadLock() ) readers++; } return writers + readers; } void OpDebug::reset() { extra.reset(); op = 0; iscommand = false; ns = ""; query = BSONObj(); updateobj = BSONObj(); cursorid = -1; ntoreturn = -1; ntoskip = -1; exhaust = false; nscanned = -1; nscannedObjects = -1; idhack = false; scanAndOrder = false; nMatched = -1; nModified = -1; ninserted = -1; ndeleted = -1; nmoved = -1; fastmod = false; fastmodinsert = false; upsert = false; keyUpdates = 0; // unsigned, so -1 not possible planSummary = ""; execStats.reset(); exceptionInfo.reset(); executionTime = 0; nreturned = -1; responseLength = -1; } #define OPDEBUG_TOSTRING_HELP(x) if( x >= 0 ) s << " " #x ":" << (x) #define OPDEBUG_TOSTRING_HELP_BOOL(x) if( x ) s << " " #x ":" << (x) string OpDebug::report( const CurOp& curop ) const { StringBuilder s; if ( iscommand ) s << "command "; else s << opToString( op ) << ' '; s << ns.toString(); if ( ! query.isEmpty() ) { if ( iscommand ) { s << " command: "; Command* curCommand = curop.getCommand(); if (curCommand) { mutablebson::Document cmdToLog(curop.query(), mutablebson::Document::kInPlaceDisabled); curCommand->redactForLogging(&cmdToLog); s << curCommand->name << " "; s << cmdToLog.toString(); } else { // Should not happen but we need to handle curCommand == NULL gracefully s << query.toString(); } } else { s << " query: "; s << query.toString(); } } if (!planSummary.empty()) { s << " planSummary: " << planSummary.toString(); } if ( ! updateobj.isEmpty() ) { s << " update: "; updateobj.toString( s ); } OPDEBUG_TOSTRING_HELP( cursorid ); OPDEBUG_TOSTRING_HELP( ntoreturn ); OPDEBUG_TOSTRING_HELP( ntoskip ); OPDEBUG_TOSTRING_HELP_BOOL( exhaust ); OPDEBUG_TOSTRING_HELP( nscanned ); OPDEBUG_TOSTRING_HELP( nscannedObjects ); OPDEBUG_TOSTRING_HELP_BOOL( idhack ); OPDEBUG_TOSTRING_HELP_BOOL( scanAndOrder ); OPDEBUG_TOSTRING_HELP( nmoved ); OPDEBUG_TOSTRING_HELP( nMatched ); OPDEBUG_TOSTRING_HELP( nModified ); OPDEBUG_TOSTRING_HELP( ninserted ); OPDEBUG_TOSTRING_HELP( ndeleted ); OPDEBUG_TOSTRING_HELP_BOOL( fastmod ); OPDEBUG_TOSTRING_HELP_BOOL( fastmodinsert ); OPDEBUG_TOSTRING_HELP_BOOL( upsert ); OPDEBUG_TOSTRING_HELP( keyUpdates ); if ( extra.len() ) s << " " << extra.str(); if ( ! exceptionInfo.empty() ) { s << " exception: " << exceptionInfo.msg; if ( exceptionInfo.code ) s << " code:" << exceptionInfo.code; } s << " numYields:" << curop.numYields(); s << " "; curop.lockStat().report( s ); OPDEBUG_TOSTRING_HELP( nreturned ); if ( responseLength > 0 ) s << " reslen:" << responseLength; s << " " << executionTime << "ms"; return s.str(); } #define OPDEBUG_APPEND_NUMBER(x) if( x != -1 ) b.appendNumber( #x , (x) ) #define OPDEBUG_APPEND_BOOL(x) if( x ) b.appendBool( #x , (x) ) bool OpDebug::append(const CurOp& curop, BSONObjBuilder& b, size_t maxSize) const { b.append( "op" , iscommand ? "command" : opToString( op ) ); b.append( "ns" , ns.toString() ); int queryUpdateObjSize = 0; if (!query.isEmpty()) { queryUpdateObjSize += query.objsize(); } else if (!iscommand && curop.haveQuery()) { queryUpdateObjSize += curop.query()["query"].size(); } if (!updateobj.isEmpty()) { queryUpdateObjSize += updateobj.objsize(); } if (static_cast(queryUpdateObjSize) > maxSize) { if (!query.isEmpty()) { // Use 60 since BSONObj::toString can truncate strings into 150 chars // and we want to have enough room for both query and updateobj when // the entire document is going to be serialized into a string const string abbreviated(query.toString(false, false), 0, 60); b.append(iscommand ? "command" : "query", abbreviated + "..."); } else if (!iscommand && curop.haveQuery()) { const string abbreviated(curop.query()["query"].toString(false, false), 0, 60); b.append("query", abbreviated + "..."); } if (!updateobj.isEmpty()) { const string abbreviated(updateobj.toString(false, false), 0, 60); b.append("updateobj", abbreviated + "..."); } return false; } if (!query.isEmpty()) { b.append(iscommand ? "command" : "query", query); } else if (!iscommand && curop.haveQuery()) { curop.appendQuery(b, "query"); } if (!updateobj.isEmpty()) { b.append("updateobj", updateobj); } const bool moved = (nmoved >= 1); OPDEBUG_APPEND_NUMBER( cursorid ); OPDEBUG_APPEND_NUMBER( ntoreturn ); OPDEBUG_APPEND_NUMBER( ntoskip ); OPDEBUG_APPEND_BOOL( exhaust ); OPDEBUG_APPEND_NUMBER( nscanned ); OPDEBUG_APPEND_NUMBER( nscannedObjects ); OPDEBUG_APPEND_BOOL( idhack ); OPDEBUG_APPEND_BOOL( scanAndOrder ); OPDEBUG_APPEND_BOOL( moved ); OPDEBUG_APPEND_NUMBER( nmoved ); OPDEBUG_APPEND_NUMBER( nMatched ); OPDEBUG_APPEND_NUMBER( nModified ); OPDEBUG_APPEND_NUMBER( ninserted ); OPDEBUG_APPEND_NUMBER( ndeleted ); OPDEBUG_APPEND_BOOL( fastmod ); OPDEBUG_APPEND_BOOL( fastmodinsert ); OPDEBUG_APPEND_BOOL( upsert ); OPDEBUG_APPEND_NUMBER( keyUpdates ); b.appendNumber( "numYield" , curop.numYields() ); b.append( "lockStats" , curop.lockStat().report() ); if ( ! exceptionInfo.empty() ) exceptionInfo.append( b , "exception" , "exceptionCode" ); OPDEBUG_APPEND_NUMBER( nreturned ); OPDEBUG_APPEND_NUMBER( responseLength ); b.append( "millis" , executionTime ); execStats.append(b, "execStats"); return true; } void saveGLEStats(const BSONObj& result, const std::string& conn) { // This can be called in mongod, which is unfortunate. To fix this, // we can redesign how connection pooling works on mongod for sharded operations. } }