diff options
author | Eliot Horowitz <eliot@10gen.com> | 2011-12-24 15:33:26 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2011-12-24 15:33:45 -0500 |
commit | ae1ecd9c786911f9f1f0242f0f7d702b3e5dfeba (patch) | |
tree | 92f8e1649e6f080b251ff5f1763679a72eb59b34 /src/mongo/db/client.cpp | |
parent | dfa4cd7e2cf109b072440155fabc08a93c8045a0 (diff) | |
download | mongo-ae1ecd9c786911f9f1f0242f0f7d702b3e5dfeba.tar.gz |
bulk move of code to src/ SERVER-4551
Diffstat (limited to 'src/mongo/db/client.cpp')
-rw-r--r-- | src/mongo/db/client.cpp | 697 |
1 files changed, 697 insertions, 0 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp new file mode 100644 index 00000000000..92b78d87ee5 --- /dev/null +++ b/src/mongo/db/client.cpp @@ -0,0 +1,697 @@ +// client.cpp + +/** +* Copyright (C) 2009 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +/* Client represents a connection to the database (the server-side) and corresponds + to an open socket (or logical connection if pooling on sockets) from a client. +*/ + +#include "pch.h" +#include "db.h" +#include "client.h" +#include "curop-inl.h" +#include "json.h" +#include "security.h" +#include "commands.h" +#include "instance.h" +#include "../s/d_logic.h" +#include "dbwebserver.h" +#include "../util/mongoutils/html.h" +#include "../util/mongoutils/checksum.h" +#include "../util/file_allocator.h" +#include "repl/rs.h" +#include "../scripting/engine.h" + +namespace mongo { + + Client* Client::syncThread; + mongo::mutex Client::clientsMutex("clientsMutex"); + set<Client*> Client::clients; // always be in clientsMutex when manipulating this + + TSP_DEFINE(Client, currentClient) + +#if defined(_DEBUG) + struct StackChecker; + ThreadLocalValue<StackChecker *> checker; + + struct StackChecker { + enum { SZ = 256 * 1024 }; + char buf[SZ]; + StackChecker() { + checker.set(this); + } + void init() { + memset(buf, 42, sizeof(buf)); + } + static void check(const char *tname) { + static int max; + StackChecker *sc = checker.get(); + const char *p = sc->buf; + int i = 0; + for( ; i < SZ; i++ ) { + if( p[i] != 42 ) + break; + } + int z = SZ-i; + if( z > max ) { + max = z; + log() << "thread " << tname << " stack usage was " << z << " bytes" << endl; + } + wassert( i > 16000 ); + } + }; +#endif + + /* each thread which does db operations has a Client object in TLS. + call this when your thread starts. + */ +#if defined _DEBUG + static unsigned long long nThreads = 0; + void assertStartingUp() { + assert( nThreads <= 1 ); + } +#else + void assertStartingUp() { } +#endif + + Client& Client::initThread(const char *desc, AbstractMessagingPort *mp) { +#if defined(_DEBUG) + { + nThreads++; // never decremented. this is for casi class asserts + if( sizeof(void*) == 8 ) { + StackChecker sc; + sc.init(); + } + } +#endif + assert( currentClient.get() == 0 ); + Client *c = new Client(desc, mp); + currentClient.reset(c); + mongo::lastError.initThread(); + return *c; + } + + Client::Client(const char *desc, AbstractMessagingPort *p) : + _context(0), + _shutdown(false), + _desc(desc), + _god(0), + _lastOp(0), + _mp(p), + _sometimes(0) + { + _hasWrittenThisPass = false; + _pageFaultRetryableSection = 0; + _connectionId = setThreadName(desc); + _curOp = new CurOp( this ); +#ifndef _WIN32 + stringstream temp; + temp << hex << showbase << pthread_self(); + _threadId = temp.str(); +#endif + scoped_lock bl(clientsMutex); + clients.insert(this); + } + + Client::~Client() { + _god = 0; + + if ( _context ) + error() << "Client::~Client _context should be null but is not; client:" << _desc << endl; + + if ( ! _shutdown ) { + error() << "Client::shutdown not called: " << _desc << endl; + } + + if ( ! inShutdown() ) { + // we can't clean up safely once we're in shutdown + scoped_lock bl(clientsMutex); + if ( ! _shutdown ) + clients.erase(this); + delete _curOp; + } + } + + bool Client::shutdown() { +#if defined(_DEBUG) + { + if( sizeof(void*) == 8 ) { + StackChecker::check( desc() ); + } + } +#endif + _shutdown = true; + if ( inShutdown() ) + return false; + { + scoped_lock bl(clientsMutex); + clients.erase(this); + if ( isSyncThread() ) { + syncThread = 0; + } + } + + return false; + } + + BSONObj CachedBSONObj::_tooBig = fromjson("{\"$msg\":\"query not recording (too large)\"}"); + Client::Context::Context( string ns , Database * db, bool doauth ) : + _client( currentClient.get() ), + _oldContext( _client->_context ), + _path( mongo::dbpath ), // is this right? could be a different db? may need a dassert for this + _justCreated(false), + _ns( ns ), + _db(db) + { + assert( db == 0 || db->isOk() ); + _client->_context = this; + checkNsAccess( doauth ); + _client->checkLocks(); + } + + Client::Context::Context(const string& ns, string path , bool doauth ) : + _client( currentClient.get() ), + _oldContext( _client->_context ), + _path( path ), + _justCreated(false), // set for real in finishInit + _ns( ns ), + _db(0) + { + _finishInit( doauth ); + _client->checkLocks(); + } + + /** "read lock, and set my context, all in one operation" + * This handles (if not recursively locked) opening an unopened database. + */ + Client::ReadContext::ReadContext(const string& ns, string path, bool doauth ) { + { + lk.reset( new _LockCollectionForReading(ns) ); + Database *db = dbHolder().get(ns, path); + if( db ) { + c.reset( new Context(path, ns, db, doauth) ); + return; + } + } + + // we usually don't get here, so doesn't matter how fast this part is + { + int x = d.dbMutex.getState(); + if( x > 0 ) { + // write locked already + DEV RARELY log() << "write locked on ReadContext construction " << ns << endl; + c.reset( new Context(ns, path, doauth) ); + } + else if( x == -1 ) { + lk.reset(0); + { + writelock w; + Context c(ns, path, doauth); + } + // db could be closed at this interim point -- that is ok, we will throw, and don't mind throwing. + lk.reset( new _LockCollectionForReading(ns) ); + c.reset( new Context(ns, path, doauth) ); + } + else { + assert( x < -1 ); + uasserted(15928, str::stream() << "can't open a database from a nested read lock " << ns); + } + } + + // todo: are receipts of thousands of queries for a nonexisting database a potential + // cause of bad performance due to the write lock acquisition above? let's fix that. + // it would be easy to first check that there is at least a .ns file, or something similar. + } + + void Client::Context::checkNotStale() const { + switch ( _client->_curOp->getOp() ) { + case dbGetMore: // getMore's are special and should be handled else where + case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well + case dbDelete: + break; + default: { + string errmsg; + if ( ! shardVersionOk( _ns , errmsg ) ) { + ostringstream os; + os << "[" << _ns << "] shard version not ok in Client::Context: " << errmsg; + throw SendStaleConfigException( _ns, os.str() ); + } + } + } + } + + // invoked from ReadContext + Client::Context::Context(const string& path, const string& ns, Database *db , bool doauth) : + _client( currentClient.get() ), + _oldContext( _client->_context ), + _path( path ), + _justCreated(false), + _ns( ns ), + _db(db) + { + assert(_db); + checkNotStale(); + _client->_context = this; + _client->_curOp->enter( this ); + checkNsAccess( doauth, d.dbMutex.getState() ); + _client->checkLocks(); + } + + void Client::Context::_finishInit( bool doauth ) { + int lockState = d.dbMutex.getState(); + assert( lockState ); + if ( lockState > 0 && FileAllocator::get()->hasFailed() ) { + uassert(14031, "Can't take a write lock while out of disk space", false); + } + + _db = dbHolderUnchecked().getOrCreate( _ns , _path , _justCreated ); + assert(_db); + checkNotStale(); + _client->_context = this; + _client->_curOp->enter( this ); + checkNsAccess( doauth, lockState ); + } + + void Client::Context::_auth( int lockState ) { + if ( _client->_ai.isAuthorizedForLock( _db->name , lockState ) ) + return; + + // before we assert, do a little cleanup + _client->_context = _oldContext; // note: _oldContext may be null + + stringstream ss; + ss << "unauthorized db:" << _db->name << " lock type:" << lockState << " client:" << _client->clientAddress(); + uasserted( 10057 , ss.str() ); + } + + Client::Context::~Context() { + DEV assert( _client == currentClient.get() ); + _client->_curOp->leave( this ); + _client->_context = _oldContext; // note: _oldContext may be null + } + + bool Client::Context::inDB( const string& db , const string& path ) const { + if ( _path != path ) + return false; + + if ( db == _ns ) + return true; + + string::size_type idx = _ns.find( db ); + if ( idx != 0 ) + return false; + + return _ns[db.size()] == '.'; + } + + void Client::Context::checkNsAccess( bool doauth, int lockState ) { + if ( 0 ) { // SERVER-4276 + uassert( 15929, "client access to index backing namespace prohibited", NamespaceString::normal( _ns.c_str() ) ); + } + if ( doauth ) { + _auth( lockState ); + } + } + + void Client::appendLastOp( BSONObjBuilder& b ) const { + // _lastOp is never set if replication is off + if( theReplSet || ! _lastOp.isNull() ) { + b.appendTimestamp( "lastOp" , _lastOp.asDate() ); + } + } + + string Client::clientAddress(bool includePort) const { + if( _curOp ) + return _curOp->getRemoteString(includePort); + return ""; + } + + string Client::toString() const { + stringstream ss; + if ( _curOp ) + ss << _curOp->infoNoauth().jsonString(); + return ss.str(); + } + + string sayClientState() { + Client* c = currentClient.get(); + if ( !c ) + return "no client"; + return c->toString(); + } + + Client* curopWaitingForLock( int type ) { + Client * c = currentClient.get(); + assert( c ); + CurOp * co = c->curop(); + if ( co ) { + co->waitingForLock( type ); + } + return c; + } + void curopGotLock(Client *c) { + assert(c); + CurOp * co = c->curop(); + if ( co ) + co->gotLock(); + } + + void KillCurrentOp::interruptJs( AtomicUInt *op ) { + if ( !globalScriptEngine ) + return; + if ( !op ) { + globalScriptEngine->interruptAll(); + } + else { + globalScriptEngine->interrupt( *op ); + } + } + + void KillCurrentOp::killAll() { + _globalKill = true; + interruptJs( 0 ); + } + + void KillCurrentOp::kill(AtomicUInt i) { + bool found = false; + { + scoped_lock l( Client::clientsMutex ); + for( set< Client* >::const_iterator j = Client::clients.begin(); !found && j != Client::clients.end(); ++j ) { + for( CurOp *k = ( *j )->curop(); !found && k; k = k->parent() ) { + if ( k->opNum() == i ) { + k->kill(); + for( CurOp *l = ( *j )->curop(); l != k; l = l->parent() ) { + l->kill(); + } + found = true; + } + } + } + } + if ( found ) { + interruptJs( &i ); + } + } + + void Client::gotHandshake( const BSONObj& o ) { + BSONObjIterator i(o); + + { + BSONElement id = i.next(); + assert( id.type() ); + _remoteId = id.wrap( "_id" ); + } + + BSONObjBuilder b; + while ( i.more() ) + b.append( i.next() ); + + b.appendElementsUnique( _handshake ); + + _handshake = b.obj(); + + if (theReplSet && o.hasField("member")) { + theReplSet->ghost->associateSlave(_remoteId, o["member"].Int()); + } + } + + ClientBasic* ClientBasic::getCurrent() { + return currentClient.get(); + } + + class HandshakeCmd : public Command { + public: + void help(stringstream& h) const { h << "internal"; } + HandshakeCmd() : Command( "handshake" ) {} + virtual LockType locktype() const { return NONE; } + virtual bool slaveOk() const { return true; } + virtual bool adminOnly() const { return false; } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + Client& c = cc(); + c.gotHandshake( cmdObj ); + return 1; + } + + } handshakeCmd; + + class ClientListPlugin : public WebStatusPlugin { + public: + ClientListPlugin() : WebStatusPlugin( "clients" , 20 ) {} + virtual void init() {} + + virtual void run( stringstream& ss ) { + using namespace mongoutils::html; + + ss << "\n<table border=1 cellpadding=2 cellspacing=0>"; + ss << "<tr align='left'>" + << th( a("", "Connections to the database, both internal and external.", "Client") ) + << th( a("http://www.mongodb.org/display/DOCS/Viewing+and+Terminating+Current+Operation", "", "OpId") ) + << "<th>Active</th>" + << "<th>LockType</th>" + << "<th>Waiting</th>" + << "<th>SecsRunning</th>" + << "<th>Op</th>" + << th( a("http://www.mongodb.org/display/DOCS/Developer+FAQ#DeveloperFAQ-What%27sa%22namespace%22%3F", "", "Namespace") ) + << "<th>Query</th>" + << "<th>client</th>" + << "<th>msg</th>" + << "<th>progress</th>" + + << "</tr>\n"; + { + scoped_lock bl(Client::clientsMutex); + for( set<Client*>::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) { + Client *c = *i; + CurOp& co = *(c->curop()); + ss << "<tr><td>" << c->desc() << "</td>"; + + tablecell( ss , co.opNum() ); + tablecell( ss , co.active() ); + { + int lt = co.getLockType(); + if( lt == -1 ) tablecell(ss, "R"); + else if( lt == 1 ) tablecell(ss, "W"); + else + tablecell( ss , lt); + } + tablecell( ss , co.isWaitingForLock() ); + if ( co.active() ) + tablecell( ss , co.elapsedSeconds() ); + else + tablecell( ss , "" ); + tablecell( ss , co.getOp() ); + tablecell( ss , co.getNS() ); + if ( co.haveQuery() ) { + tablecell( ss , co.query() ); + } + else + tablecell( ss , "" ); + tablecell( ss , co.getRemoteString() ); + + tablecell( ss , co.getMessage() ); + tablecell( ss , co.getProgressMeter().toString() ); + + + ss << "</tr>\n"; + } + } + ss << "</table>\n"; + + } + + } clientListPlugin; + + int Client::recommendedYieldMicros( int * writers , int * readers ) { + int num = 0; + int w = 0; + int r = 0; + { + scoped_lock bl(clientsMutex); + for ( set<Client*>::iterator i=clients.begin(); i!=clients.end(); ++i ) { + Client* c = *i; + if ( c->curop()->isWaitingForLock() ) { + num++; + if ( c->curop()->getLockType() > 0 ) + w++; + else + r++; + } + } + } + + if ( writers ) + *writers = w; + if ( readers ) + *readers = r; + + int time = r * 100; + time += w * 500; + + time = min( time , 1000000 ); + + // if there has been a kill request for this op - we should yield to allow the op to stop + // This function returns empty string if we aren't interrupted + if ( *killCurrentOp.checkForInterruptNoAssert() ) { + return 100; + } + + return time; + } + + int Client::getActiveClientCount( int& writers, int& readers ) { + writers = 0; + readers = 0; + + scoped_lock bl(clientsMutex); + for ( set<Client*>::iterator i=clients.begin(); i!=clients.end(); ++i ) { + Client* c = *i; + if ( ! c->curop()->active() ) + continue; + + int l = c->curop()->getLockType(); + if ( l > 0 ) + writers++; + else if ( l < 0 ) + readers++; + + } + + return writers + readers; + } + + void OpDebug::reset() { + extra.reset(); + + op = 0; + iscommand = false; + ns = ""; + query = BSONObj(); + updateobj = BSONObj(); + + cursorid = -1; + ntoreturn = -1; + ntoskip = -1; + exhaust = false; + + nscanned = -1; + idhack = false; + scanAndOrder = false; + moved = false; + fastmod = false; + fastmodinsert = false; + upsert = false; + keyUpdates = 0; // unsigned, so -1 not possible + + exceptionInfo.reset(); + + executionTime = 0; + nreturned = -1; + responseLength = -1; + } + + +#define OPDEBUG_TOSTRING_HELP(x) if( x >= 0 ) s << " " #x ":" << (x) +#define OPDEBUG_TOSTRING_HELP_BOOL(x) if( x ) s << " " #x ":" << (x) + string OpDebug::toString() const { + StringBuilder s( ns.size() + 64 ); + if ( iscommand ) + s << "command "; + else + s << opToString( op ) << ' '; + s << ns.toString(); + + if ( ! query.isEmpty() ) { + if ( iscommand ) + s << " command: "; + else + s << " query: "; + s << query.toString(); + } + + if ( ! updateobj.isEmpty() ) { + s << " update: "; + updateobj.toString( s ); + } + + OPDEBUG_TOSTRING_HELP( cursorid ); + OPDEBUG_TOSTRING_HELP( ntoreturn ); + OPDEBUG_TOSTRING_HELP( ntoskip ); + OPDEBUG_TOSTRING_HELP_BOOL( exhaust ); + + OPDEBUG_TOSTRING_HELP( nscanned ); + OPDEBUG_TOSTRING_HELP_BOOL( idhack ); + OPDEBUG_TOSTRING_HELP_BOOL( scanAndOrder ); + OPDEBUG_TOSTRING_HELP_BOOL( moved ); + OPDEBUG_TOSTRING_HELP_BOOL( fastmod ); + OPDEBUG_TOSTRING_HELP_BOOL( fastmodinsert ); + OPDEBUG_TOSTRING_HELP_BOOL( upsert ); + OPDEBUG_TOSTRING_HELP( keyUpdates ); + + if ( extra.len() ) + s << " " << extra.str(); + + if ( ! exceptionInfo.empty() ) { + s << " exception: " << exceptionInfo.msg; + if ( exceptionInfo.code ) + s << " code:" << exceptionInfo.code; + } + + OPDEBUG_TOSTRING_HELP( nreturned ); + if ( responseLength ) + s << " reslen:" << responseLength; + s << " " << executionTime << "ms"; + + return s.str(); + } + +#define OPDEBUG_APPEND_NUMBER(x) if( x != -1 ) b.append( #x , (x) ) +#define OPDEBUG_APPEND_BOOL(x) if( x ) b.appendBool( #x , (x) ) + void OpDebug::append( const CurOp& curop, BSONObjBuilder& b ) const { + b.append( "op" , iscommand ? "command" : opToString( op ) ); + b.append( "ns" , ns.toString() ); + if ( ! query.isEmpty() ) + b.append( iscommand ? "command" : "query" , query ); + else if ( ! iscommand && curop.haveQuery() ) + curop.appendQuery( b , "query" ); + + if ( ! updateobj.isEmpty() ) + b.append( "updateobj" , updateobj ); + + OPDEBUG_APPEND_NUMBER( cursorid ); + OPDEBUG_APPEND_NUMBER( ntoreturn ); + OPDEBUG_APPEND_NUMBER( ntoskip ); + OPDEBUG_APPEND_BOOL( exhaust ); + + OPDEBUG_APPEND_NUMBER( nscanned ); + OPDEBUG_APPEND_BOOL( idhack ); + OPDEBUG_APPEND_BOOL( scanAndOrder ); + OPDEBUG_APPEND_BOOL( moved ); + OPDEBUG_APPEND_BOOL( fastmod ); + OPDEBUG_APPEND_BOOL( fastmodinsert ); + OPDEBUG_APPEND_BOOL( upsert ); + OPDEBUG_APPEND_NUMBER( keyUpdates ); + + if ( ! exceptionInfo.empty() ) + exceptionInfo.append( b , "exception" , "exceptionCode" ); + + OPDEBUG_APPEND_NUMBER( nreturned ); + OPDEBUG_APPEND_NUMBER( responseLength ); + b.append( "millis" , executionTime ); + + } + +} |