diff options
Diffstat (limited to 'src/mongo/db/instance.cpp')
-rw-r--r-- | src/mongo/db/instance.cpp | 1148 |
1 files changed, 1148 insertions, 0 deletions
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp new file mode 100644 index 00000000000..c8f8c6ea85b --- /dev/null +++ b/src/mongo/db/instance.cpp @@ -0,0 +1,1148 @@ +// instance.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "db.h" +#include "../bson/util/atomic_int.h" +#include "introspect.h" +#include "repl.h" +#include "dbmessage.h" +#include "instance.h" +#include "lasterror.h" +#include "security.h" +#include "json.h" +#include "replutil.h" +#include "../s/d_logic.h" +#include "../util/file_allocator.h" +#include "../util/goodies.h" +#include "cmdline.h" +#if !defined(_WIN32) +#include <sys/file.h> +#endif +#include "stats/counters.h" +#include "background.h" +#include "dur_journal.h" +#include "dur_recover.h" +#include "d_concurrency.h" +#include "ops/count.h" +#include "ops/delete.h" +#include "ops/query.h" +#include "ops/update.h" +#include "pagefault.h" + +namespace mongo { + + // "diaglog" + inline void opread(Message& m) { if( _diaglog.getLevel() & 2 ) _diaglog.readop((char *) m.singleData(), m.header()->len); } + inline void opwrite(Message& m) { if( _diaglog.getLevel() & 1 ) _diaglog.write((char *) m.singleData(), m.header()->len); } + + void receivedKillCursors(Message& m); + void receivedUpdate(Message& m, CurOp& op); + void receivedDelete(Message& m, CurOp& op); + void receivedInsert(Message& m, CurOp& op); + bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop ); + + int nloggedsome = 0; +#define LOGWITHRATELIMIT if( ++nloggedsome < 1000 || nloggedsome % 100 == 0 ) + + string dbExecCommand; + + DiagLog _diaglog; + + bool useCursors = true; + bool useHints = true; + + KillCurrentOp killCurrentOp; + + int lockFile = 0; +#ifdef _WIN32 + HANDLE lockFileHandle; +#endif + + // see FSyncCommand: + extern bool lockedForWriting; + + OpTime OpTime::now() { + DEV d.dbMutex.assertWriteLocked(); + return now_inlock(); + } + OpTime OpTime::last_inlock(){ + DEV d.dbMutex.assertAtLeastReadLocked(); + return last; + } + + // OpTime::now() uses dbMutex, thus it is in this file not in the cpp files used by drivers and such + void BSONElementManipulator::initTimestamp() { + massert( 10332 , "Expected CurrentTime type", _element.type() == Timestamp ); + unsigned long long ×tamp = *( reinterpret_cast< unsigned long long* >( value() ) ); + if ( timestamp == 0 ) + timestamp = OpTime::now().asDate(); + } + void BSONElementManipulator::SetNumber(double d) { + if ( _element.type() == NumberDouble ) + *getDur().writing( reinterpret_cast< double * >( value() ) ) = d; + else if ( _element.type() == NumberInt ) + *getDur().writing( reinterpret_cast< int * >( value() ) ) = (int) d; + else assert(0); + } + void BSONElementManipulator::SetLong(long long n) { + assert( _element.type() == NumberLong ); + *getDur().writing( reinterpret_cast< long long * >(value()) ) = n; + } + void BSONElementManipulator::SetInt(int n) { + assert( _element.type() == NumberInt ); + getDur().writingInt( *reinterpret_cast< int * >( value() ) ) = n; + } + /* dur:: version */ + void BSONElementManipulator::ReplaceTypeAndValue( const BSONElement &e ) { + char *d = data(); + char *v = value(); + int valsize = e.valuesize(); + int ofs = (int) (v-d); + dassert( ofs > 0 ); + char *p = (char *) getDur().writingPtr(d, valsize + ofs); + *p = e.type(); + memcpy( p + ofs, e.value(), valsize ); + } + + void inProgCmd( Message &m, DbResponse &dbresponse ) { + BSONObjBuilder b; + + if( ! cc().isAdmin() ) { + b.append("err", "unauthorized"); + } + else { + DbMessage d(m); + QueryMessage q(d); + bool all = q.query["$all"].trueValue(); + vector<BSONObj> vals; + { + Client& me = cc(); + scoped_lock bl(Client::clientsMutex); + auto_ptr<Matcher> m(new Matcher(q.query)); + for( set<Client*>::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) { + Client *c = *i; + assert( c ); + CurOp* co = c->curop(); + if ( c == &me && !co ) { + continue; + } + assert( co ); + if( all || co->active() ) { + BSONObj info = co->infoNoauth(); + if ( all || m->matches( info )) { + vals.push_back( info ); + } + } + } + } + b.append("inprog", vals); + unsigned x = lockedForWriting; + if( x ) { + b.append("fsyncLock", x); + b.append("info", "use db.fsyncUnlock() to terminate the fsync write/snapshot lock"); + } + } + + replyToQuery(0, m, dbresponse, b.obj()); + } + + void killOp( Message &m, DbResponse &dbresponse ) { + BSONObj obj; + if( ! cc().isAdmin() ) { + obj = fromjson("{\"err\":\"unauthorized\"}"); + } + /*else if( !dbMutexInfo.isLocked() ) + obj = fromjson("{\"info\":\"no op in progress/not locked\"}"); + */ + else { + DbMessage d(m); + QueryMessage q(d); + BSONElement e = q.query.getField("op"); + if( !e.isNumber() ) { + obj = fromjson("{\"err\":\"no op number field specified?\"}"); + } + else { + log() << "going to kill op: " << e << endl; + obj = fromjson("{\"info\":\"attempting to kill op\"}"); + killCurrentOp.kill( (unsigned) e.number() ); + } + } + replyToQuery(0, m, dbresponse, obj); + } + + void unlockFsyncAndWait(); + void unlockFsync(const char *ns, Message& m, DbResponse &dbresponse) { + BSONObj obj; + if ( ! cc().isAdmin() ) { // checks auth + obj = fromjson("{\"err\":\"unauthorized\"}"); + } + else if (strncmp(ns, "admin.", 6) != 0 ) { + obj = fromjson("{\"err\":\"unauthorized - this command must be run against the admin DB\"}"); + } + else { + if( lockedForWriting ) { + log() << "command: unlock requested" << endl; + obj = fromjson("{ok:1,\"info\":\"unlock completed\"}"); + unlockFsyncAndWait(); + } + else { + obj = fromjson("{ok:0,\"errmsg\":\"not locked\"}"); + } + } + replyToQuery(0, m, dbresponse, obj); + } + + static bool receivedQuery(Client& c, DbResponse& dbresponse, Message& m ) { + bool ok = true; + MSGID responseTo = m.header()->id; + + DbMessage d(m); + QueryMessage q(d); + auto_ptr< Message > resp( new Message() ); + + CurOp& op = *(c.curop()); + + shared_ptr<AssertionException> ex; + + try { + dbresponse.exhaust = runQuery(m, q, op, *resp); + assert( !resp->empty() ); + } + catch ( SendStaleConfigException& e ){ + ex.reset( new SendStaleConfigException( e.getns(), e.getInfo().msg ) ); + ok = false; + } + catch ( AssertionException& e ) { + ex.reset( new AssertionException( e.getInfo().msg, e.getCode() ) ); + ok = false; + } + + if( ex ){ + + op.debug().exceptionInfo = ex->getInfo(); + LOGWITHRATELIMIT { + log() << "assertion " << ex->toString() << " ns:" << q.ns << " query:" << + (q.query.valid() ? q.query.toString() : "query object is corrupt") << endl; + if( q.ntoskip || q.ntoreturn ) + log() << " ntoskip:" << q.ntoskip << " ntoreturn:" << q.ntoreturn << endl; + } + + SendStaleConfigException* scex = NULL; + if ( ex->getCode() == SendStaleConfigCode ) scex = static_cast<SendStaleConfigException*>( ex.get() ); + + BSONObjBuilder err; + ex->getInfo().append( err ); + if( scex ) err.append( "ns", scex->getns() ); + BSONObj errObj = err.done(); + + log() << errObj << endl; + + BufBuilder b; + b.skip(sizeof(QueryResult)); + b.appendBuf((void*) errObj.objdata(), errObj.objsize()); + + // todo: call replyToQuery() from here instead of this!!! see dbmessage.h + QueryResult * msgdata = (QueryResult *) b.buf(); + b.decouple(); + QueryResult *qr = msgdata; + qr->_resultFlags() = ResultFlag_ErrSet; + if( scex ) qr->_resultFlags() |= ResultFlag_ShardConfigStale; + qr->len = b.len(); + qr->setOperation(opReply); + qr->cursorId = 0; + qr->startingFrom = 0; + qr->nReturned = 1; + resp.reset( new Message() ); + resp->setData( msgdata, true ); + + } + + op.debug().responseLength = resp->header()->dataLen(); + + dbresponse.response = resp.release(); + dbresponse.responseTo = responseTo; + + return ok; + } + + void (*reportEventToSystem)(const char *msg) = 0; + + void mongoAbort(const char *msg) { + if( reportEventToSystem ) + reportEventToSystem(msg); + rawOut(msg); + ::abort(); + } + + // Returns false when request includes 'end' + void _assembleResponse( Message &m, DbResponse &dbresponse, const HostAndPort& remote ) { + + // before we lock... + int op = m.operation(); + bool isCommand = false; + const char *ns = m.singleData()->_data + 4; + if ( op == dbQuery ) { + if( strstr(ns, ".$cmd") ) { + isCommand = true; + opwrite(m); + if( strstr(ns, ".$cmd.sys.") ) { + if( strstr(ns, "$cmd.sys.inprog") ) { + inProgCmd(m, dbresponse); + return; + } + if( strstr(ns, "$cmd.sys.killop") ) { + killOp(m, dbresponse); + return; + } + if( strstr(ns, "$cmd.sys.unlock") ) { + unlockFsync(ns, m, dbresponse); + return; + } + } + } + else { + opread(m); + } + } + else if( op == dbGetMore ) { + opread(m); + } + else { + opwrite(m); + } + + globalOpCounters.gotOp( op , isCommand ); + + Client& c = cc(); + + auto_ptr<CurOp> nestedOp; + CurOp* currentOpP = c.curop(); + if ( currentOpP->active() ) { + nestedOp.reset( new CurOp( &c , currentOpP ) ); + currentOpP = nestedOp.get(); + } + CurOp& currentOp = *currentOpP; + currentOp.reset(remote,op); + + OpDebug& debug = currentOp.debug(); + debug.op = op; + + int logThreshold = cmdLine.slowMS; + bool log = logLevel >= 1; + + if ( op == dbQuery ) { + if ( handlePossibleShardedMessage( m , &dbresponse ) ) + return; + receivedQuery(c , dbresponse, m ); + } + else if ( op == dbGetMore ) { + if ( ! receivedGetMore(dbresponse, m, currentOp) ) + log = true; + } + else if ( op == dbMsg ) { + // deprecated - replaced by commands + char *p = m.singleData()->_data; + int len = strlen(p); + if ( len > 400 ) + out() << curTimeMillis64() % 10000 << + " long msg received, len:" << len << endl; + + Message *resp = new Message(); + if ( strcmp( "end" , p ) == 0 ) + resp->setData( opReply , "dbMsg end no longer supported" ); + else + resp->setData( opReply , "i am fine - dbMsg deprecated"); + + dbresponse.response = resp; + dbresponse.responseTo = m.header()->id; + } + else { + const char *ns = m.singleData()->_data + 4; + char cl[256]; + nsToDatabase(ns, cl); + if( ! c.getAuthenticationInfo()->isAuthorized(cl) ) { + uassert_nothrow("unauthorized"); + } + else { + try { + if ( op == dbInsert ) { + receivedInsert(m, currentOp); + } + else if ( op == dbUpdate ) { + receivedUpdate(m, currentOp); + } + else if ( op == dbDelete ) { + receivedDelete(m, currentOp); + } + else if ( op == dbKillCursors ) { + currentOp.ensureStarted(); + logThreshold = 10; + receivedKillCursors(m); + } + else { + mongo::log() << " operation isn't supported: " << op << endl; + currentOp.done(); + log = true; + } + } + catch ( UserException& ue ) { + tlog(3) << " Caught Assertion in " << opToString(op) << ", continuing " << ue.toString() << endl; + debug.exceptionInfo = ue.getInfo(); + } + catch ( AssertionException& e ) { + tlog(3) << " Caught Assertion in " << opToString(op) << ", continuing " << e.toString() << endl; + debug.exceptionInfo = e.getInfo(); + log = true; + } + } + } + currentOp.ensureStarted(); + currentOp.done(); + debug.executionTime = currentOp.totalTimeMillis(); + + //DEV log = true; + if ( log || debug.executionTime > logThreshold ) { + if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && debug.executionTime < 4300 && !log ) { + /* it's normal for getMore on the oplog to be slow because of use of awaitdata flag. */ + } + else { + mongo::tlog() << debug << endl; + } + } + + if ( currentOp.shouldDBProfile( debug.executionTime ) ) { + // performance profiling is on + if ( d.dbMutex.getState() < 0 ) { + mongo::log(1) << "note: not profiling because recursive read lock" << endl; + } + else { + writelock lk; + if ( dbHolder()._isLoaded( nsToDatabase( currentOp.getNS() ) , dbpath ) ) { + Client::Context cx( currentOp.getNS() ); + profile(c , currentOp ); + } + else { + mongo::log() << "note: not profiling because db went away - probably a close on: " << currentOp.getNS() << endl; + } + } + } + + debug.reset(); + } /* _assembleResponse() */ + + void assembleResponse( Message &m, DbResponse &dbresponse, const HostAndPort& remote ) { + PageFaultRetryableSection s; + while( 1 ) { + try { + _assembleResponse( m, dbresponse, remote ); + break; + } + catch( PageFaultException& e ) { + DEV log() << "TEMP PageFaultException touch and retry" << endl; + e.touch(); + } + } + } + + void receivedKillCursors(Message& m) { + int *x = (int *) m.singleData()->_data; + x++; // reserved + int n = *x++; + + uassert( 13659 , "sent 0 cursors to kill" , n != 0 ); + massert( 13658 , str::stream() << "bad kill cursors size: " << m.dataSize() , m.dataSize() == 8 + ( 8 * n ) ); + uassert( 13004 , str::stream() << "sent negative cursors to kill: " << n , n >= 1 ); + + if ( n > 2000 ) { + log( n < 30000 ? LL_WARNING : LL_ERROR ) << "receivedKillCursors, n=" << n << endl; + assert( n < 30000 ); + } + + int found = ClientCursor::erase(n, (long long *) x); + + if ( logLevel > 0 || found != n ) { + log( found == n ) << "killcursors: found " << found << " of " << n << endl; + } + + } + + /* db - database name + path - db directory + */ + /*static*/ void Database::closeDatabase( const char *db, const string& path ) { + assertInWriteLock(); + + Client::Context * ctx = cc().getContext(); + assert( ctx ); + assert( ctx->inDB( db , path ) ); + Database *database = ctx->db(); + assert( database->name == db ); + + oplogCheckCloseDatabase( database ); // oplog caches some things, dirty its caches + + if( BackgroundOperation::inProgForDb(db) ) { + log() << "warning: bg op in prog during close db? " << db << endl; + } + + /* important: kill all open cursors on the database */ + string prefix(db); + prefix += '.'; + ClientCursor::invalidate(prefix.c_str()); + + NamespaceDetailsTransient::clearForPrefix( prefix.c_str() ); + + dbHolderW().erase( db, path ); + ctx->_clear(); + delete database; // closes files + } + + void receivedUpdate(Message& m, CurOp& op) { + DbMessage d(m); + const char *ns = d.getns(); + op.debug().ns = ns; + int flags = d.pullInt(); + BSONObj query = d.nextJsObj(); + + assert( d.moreJSObjs() ); + assert( query.objsize() < m.header()->dataLen() ); + BSONObj toupdate = d.nextJsObj(); + uassert( 10055 , "update object too large", toupdate.objsize() <= BSONObjMaxUserSize); + assert( toupdate.objsize() < m.header()->dataLen() ); + assert( query.objsize() + toupdate.objsize() < m.header()->dataLen() ); + bool upsert = flags & UpdateOption_Upsert; + bool multi = flags & UpdateOption_Multi; + bool broadcast = flags & UpdateOption_Broadcast; + + op.debug().query = query; + op.setQuery(query); + + writelock lk; + + // void ReplSetImpl::relinquish() uses big write lock so + // this is thus synchronized given our lock above. + uassert( 10054 , "not master", isMasterNs( ns ) ); + + // if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit + if ( ! broadcast && handlePossibleShardedMessage( m , 0 ) ) + return; + + Client::Context ctx( ns ); + + UpdateResult res = updateObjects(ns, toupdate, query, upsert, multi, true, op.debug() ); + lastError.getSafe()->recordUpdate( res.existing , res.num , res.upserted ); // for getlasterror + } + + void receivedDelete(Message& m, CurOp& op) { + DbMessage d(m); + const char *ns = d.getns(); + op.debug().ns = ns; + int flags = d.pullInt(); + bool justOne = flags & RemoveOption_JustOne; + bool broadcast = flags & RemoveOption_Broadcast; + assert( d.moreJSObjs() ); + BSONObj pattern = d.nextJsObj(); + + op.debug().query = pattern; + op.setQuery(pattern); + + writelock lk(ns); + + // writelock is used to synchronize stepdowns w/ writes + uassert( 10056 , "not master", isMasterNs( ns ) ); + + // if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit + if ( ! broadcast && handlePossibleShardedMessage( m , 0 ) ) + return; + + Client::Context ctx(ns); + + long long n = deleteObjects(ns, pattern, justOne, true); + lastError.getSafe()->recordDelete( n ); + } + + QueryResult* emptyMoreResult(long long); + + void OpTime::waitForDifferent(unsigned millis){ + DEV d.dbMutex.assertAtLeastReadLocked(); + + if (*this != last) return; // check early + + boost::xtime timeout; + boost::xtime_get(&timeout, boost::TIME_UTC); + + timeout.nsec += millis * 1000*1000; + if (timeout.nsec >= 1000*1000*1000){ + timeout.nsec -= 1000*1000*1000; + timeout.sec += 1; + } + + do { + dbtemprelease tmp; + boost::mutex::scoped_lock lk(notifyMutex()); + if (!notifier().timed_wait(lk, timeout)) + return; // timed out + } while (*this != last); + } + + bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop ) { + bool ok = true; + + DbMessage d(m); + + const char *ns = d.getns(); + int ntoreturn = d.pullInt(); + long long cursorid = d.pullInt64(); + + curop.debug().ns = ns; + curop.debug().ntoreturn = ntoreturn; + curop.debug().cursorid = cursorid; + + time_t start = 0; + int pass = 0; + bool exhaust = false; + QueryResult* msgdata; + OpTime last; + while( 1 ) { + try { + Client::ReadContext ctx(ns); + if (str::startsWith(ns, "local.oplog.")){ + if (pass == 0) + last = OpTime::last_inlock(); + else + last.waitForDifferent(1000/*ms*/); + } + msgdata = processGetMore(ns, ntoreturn, cursorid, curop, pass, exhaust); + } + catch ( AssertionException& e ) { + exhaust = false; + curop.debug().exceptionInfo = e.getInfo(); + msgdata = emptyMoreResult(cursorid); + ok = false; + } + if (msgdata == 0) { + exhaust = false; + massert(13073, "shutting down", !inShutdown() ); + if( pass == 0 ) { + start = time(0); + } + else { + if( time(0) - start >= 4 ) { + // after about 4 seconds, return. pass stops at 1000 normally. + // we want to return occasionally so slave can checkpoint. + pass = 10000; + } + } + pass++; + if (debug) + sleepmillis(20); + else + sleepmillis(2); + continue; + } + break; + }; + + Message *resp = new Message(); + resp->setData(msgdata, true); + curop.debug().responseLength = resp->header()->dataLen(); + curop.debug().nreturned = msgdata->nReturned; + + dbresponse.response = resp; + dbresponse.responseTo = m.header()->id; + + if( exhaust ) { + curop.debug().exhaust = true; + dbresponse.exhaust = ns; + } + + return ok; + } + + void checkAndInsert(const char *ns, /*modifies*/BSONObj& js) { + uassert( 10059 , "object to insert too large", js.objsize() <= BSONObjMaxUserSize); + { + // check no $ modifiers. note we only check top level. (scanning deep would be quite expensive) + BSONObjIterator i( js ); + while ( i.more() ) { + BSONElement e = i.next(); + uassert( 13511 , "document to insert can't have $ fields" , e.fieldName()[0] != '$' ); + } + } + theDataFileMgr.insertWithObjMod(ns, js, false); // js may be modified in the call to add an _id field. + logOp("i", ns, js); + } + + NOINLINE_DECL void insertMulti(bool keepGoing, const char *ns, vector<BSONObj>& objs) { + size_t i; + for (i=0; i<objs.size(); i++){ + try { + checkAndInsert(ns, objs[i]); + getDur().commitIfNeeded(); + } catch (const UserException&) { + if (!keepGoing || i == objs.size()-1){ + globalOpCounters.incInsertInWriteLock(i); + throw; + } + // otherwise ignore and keep going + } + } + + globalOpCounters.incInsertInWriteLock(i); + } + + void receivedInsert(Message& m, CurOp& op) { + DbMessage d(m); + const char *ns = d.getns(); + op.debug().ns = ns; + + if( !d.moreJSObjs() ) { + // strange. should we complain? + return; + } + BSONObj first = d.nextJsObj(); + + vector<BSONObj> multi; + while (d.moreJSObjs()){ + if (multi.empty()) // first pass + multi.push_back(first); + multi.push_back( d.nextJsObj() ); + } + + writelock lk(ns); + //LockCollectionExclusively lk(ns); + + // CONCURRENCY TODO: is being read locked in big log sufficient here? + // writelock is used to synchronize stepdowns w/ writes + uassert( 10058 , "not master", isMasterNs(ns) ); + + if ( handlePossibleShardedMessage( m , 0 ) ) + return; + + Client::Context ctx(ns); + + if( !multi.empty() ) { + const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError; + insertMulti(keepGoing, ns, multi); + return; + } + + checkAndInsert(ns, first); + globalOpCounters.incInsertInWriteLock(1); + } + + void getDatabaseNames( vector< string > &names , const string& usePath ) { + boost::filesystem::path path( usePath ); + for ( boost::filesystem::directory_iterator i( path ); + i != boost::filesystem::directory_iterator(); ++i ) { + if ( directoryperdb ) { + boost::filesystem::path p = *i; + string dbName = p.leaf(); + p /= ( dbName + ".ns" ); + if ( MMF::exists( p ) ) + names.push_back( dbName ); + } + else { + string fileName = boost::filesystem::path(*i).leaf(); + if ( fileName.length() > 3 && fileName.substr( fileName.length() - 3, 3 ) == ".ns" ) + names.push_back( fileName.substr( 0, fileName.length() - 3 ) ); + } + } + } + + /* returns true if there is data on this server. useful when starting replication. + local database does NOT count except for rsoplog collection. + used to set the hasData field on replset heartbeat command response + */ + bool replHasDatabases() { + vector<string> names; + getDatabaseNames(names); + if( names.size() >= 2 ) return true; + if( names.size() == 1 ) { + if( names[0] != "local" ) + return true; + // we have a local database. return true if oplog isn't empty + { + readlock lk(rsoplog); + BSONObj o; + if( Helpers::getFirst(rsoplog, o) ) + return true; + } + } + return false; + } + + bool DBDirectClient::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { + if ( lastError._get() ) + lastError.startRequest( toSend, lastError._get() ); + DbResponse dbResponse; + assembleResponse( toSend, dbResponse , _clientHost ); + assert( dbResponse.response ); + dbResponse.response->concat(); // can get rid of this if we make response handling smarter + response = *dbResponse.response; + getDur().commitIfNeeded(); + return true; + } + + void DBDirectClient::say( Message &toSend, bool isRetry ) { + if ( lastError._get() ) + lastError.startRequest( toSend, lastError._get() ); + DbResponse dbResponse; + assembleResponse( toSend, dbResponse , _clientHost ); + getDur().commitIfNeeded(); + } + + auto_ptr<DBClientCursor> DBDirectClient::query(const string &ns, Query query, int nToReturn , int nToSkip , + const BSONObj *fieldsToReturn , int queryOptions ) { + + //if ( ! query.obj.isEmpty() || nToReturn != 0 || nToSkip != 0 || fieldsToReturn || queryOptions ) + return DBClientBase::query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions ); + // + //assert( query.obj.isEmpty() ); + //throw UserException( (string)"yay:" + ns ); + } + + void DBDirectClient::killCursor( long long id ) { + ClientCursor::erase( id ); + } + + HostAndPort DBDirectClient::_clientHost = HostAndPort( "0.0.0.0" , 0 ); + + unsigned long long DBDirectClient::count(const string &ns, const BSONObj& query, int options, int limit, int skip ) { + LockCollectionForReading lk( ns ); + string errmsg; + long long res = runCount( ns.c_str() , _countCmd( ns , query , options , limit , skip ) , errmsg ); + if ( res == -1 ) + return 0; + uassert( 13637 , str::stream() << "count failed in DBDirectClient: " << errmsg , res >= 0 ); + return (unsigned long long )res; + } + + DBClientBase * createDirectClient() { + return new DBDirectClient(); + } + + mongo::mutex exitMutex("exit"); + AtomicUInt numExitCalls = 0; + + bool inShutdown() { + return numExitCalls > 0; + } + + void tryToOutputFatal( const string& s ) { + try { + rawOut( s ); + return; + } + catch ( ... ) {} + + try { + cerr << s << endl; + return; + } + catch ( ... ) {} + + // uh - oh, not sure there is anything else we can do... + } + + /** also called by ntservice.cpp */ + void shutdownServer() { + + log() << "shutdown: going to close listening sockets..." << endl; + ListeningSockets::get()->closeAll(); + + log() << "shutdown: going to flush diaglog..." << endl; + _diaglog.flush(); + + /* must do this before unmapping mem or you may get a seg fault */ + log() << "shutdown: going to close sockets..." << endl; + boost::thread close_socket_thread( boost::bind(MessagingPort::closeAllSockets, 0) ); + + // wait until file preallocation finishes + // we would only hang here if the file_allocator code generates a + // synchronous signal, which we don't expect + log() << "shutdown: waiting for fs preallocator..." << endl; + FileAllocator::get()->waitUntilFinished(); + + if( cmdLine.dur ) { + log() << "shutdown: lock for final commit..." << endl; + { + int n = 10; + while( 1 ) { + // we may already be in a read lock from earlier in the call stack, so do read lock here + // to be consistent with that. + readlocktry w("", 20000); + if( w.got() ) { + log() << "shutdown: final commit..." << endl; + getDur().commitNow(); + break; + } + if( --n <= 0 ) { + log() << "shutdown: couldn't acquire write lock, aborting" << endl; + mongoAbort("couldn't acquire write lock"); + } + log() << "shutdown: waiting for write lock..." << endl; + } + } + MemoryMappedFile::flushAll(true); + } + + log() << "shutdown: closing all files..." << endl; + stringstream ss3; + MemoryMappedFile::closeAllFiles( ss3 ); + log() << ss3.str() << endl; + + if( cmdLine.dur ) { + dur::journalCleanup(true); + } + +#if !defined(__sunos__) + if ( lockFile ) { + log() << "shutdown: removing fs lock..." << endl; + /* This ought to be an unlink(), but Eliot says the last + time that was attempted, there was a race condition + with acquirePathLock(). */ +#ifdef _WIN32 + if( _chsize( lockFile , 0 ) ) + log() << "couldn't remove fs lock " << WSAGetLastError() << endl; + CloseHandle(lockFileHandle); +#else + if( ftruncate( lockFile , 0 ) ) + log() << "couldn't remove fs lock " << errnoWithDescription() << endl; + flock( lockFile, LOCK_UN ); +#endif + } +#endif + } + + void exitCleanly( ExitCode code ) { + killCurrentOp.killAll(); + { + dblock lk; + log() << "now exiting" << endl; + dbexit( code ); + } + } + + + namespace dur { + extern mutex groupCommitMutex; + } + + /* not using log() herein in case we are already locked */ + NOINLINE_DECL void dbexit( ExitCode rc, const char *why, bool tryToGetLock ) { + + auto_ptr<writelocktry> wlt; + if ( tryToGetLock ) { + wlt.reset( new writelocktry( "" , 2 * 60 * 1000 ) ); + uassert( 13455 , "dbexit timed out getting lock" , wlt->got() ); + } + + Client * c = currentClient.get(); + { + scoped_lock lk( exitMutex ); + if ( numExitCalls++ > 0 ) { + if ( numExitCalls > 5 ) { + // this means something horrible has happened + ::_exit( rc ); + } + stringstream ss; + ss << "dbexit: " << why << "; exiting immediately"; + tryToOutputFatal( ss.str() ); + if ( c ) c->shutdown(); + ::exit( rc ); + } + } + + { + stringstream ss; + ss << "dbexit: " << why; + tryToOutputFatal( ss.str() ); + } + + try { + shutdownServer(); // gracefully shutdown instance + } + catch ( ... ) { + tryToOutputFatal( "shutdown failed with exception" ); + } + +#if defined(_DEBUG) + try { + mutexDebugger.programEnding(); + } + catch (...) { } +#endif + + // block the dur thread from doing any work for the rest of the run + log(2) << "shutdown: groupCommitMutex" << endl; + scoped_lock lk(dur::groupCommitMutex); + +#ifdef _WIN32 + // Windows Service Controller wants to be told when we are down, + // so don't call ::exit() yet, or say "really exiting now" + // + if ( rc == EXIT_WINDOWS_SERVICE_STOP ) { + if ( c ) c->shutdown(); + return; + } +#endif + tryToOutputFatal( "dbexit: really exiting now" ); + if ( c ) c->shutdown(); + ::exit(rc); + } + +#if !defined(__sunos__) + void writePid(int fd) { + stringstream ss; + ss << getpid() << endl; + string s = ss.str(); + const char * data = s.c_str(); +#ifdef _WIN32 + assert ( _write( fd, data, strlen( data ) ) ); +#else + assert ( write( fd, data, strlen( data ) ) ); +#endif + } + + void acquirePathLock(bool doingRepair) { + string name = ( boost::filesystem::path( dbpath ) / "mongod.lock" ).native_file_string(); + + bool oldFile = false; + + if ( boost::filesystem::exists( name ) && boost::filesystem::file_size( name ) > 0 ) { + oldFile = true; + } + +#ifdef _WIN32 + lockFileHandle = CreateFileA( name.c_str(), GENERIC_READ | GENERIC_WRITE, + 0 /* do not allow anyone else access */, NULL, + OPEN_ALWAYS /* success if fh can open */, 0, NULL ); + + if (lockFileHandle == INVALID_HANDLE_VALUE) { + DWORD code = GetLastError(); + char *msg; + FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, + NULL, code, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL); + string m = msg; + str::stripTrailing(m, "\r\n"); + uasserted( 13627 , str::stream() << "Unable to create/open lock file: " << name << ' ' << m << ". Is a mongod instance already running?" ); + } + lockFile = _open_osfhandle((intptr_t)lockFileHandle, 0); +#else + lockFile = open( name.c_str(), O_RDWR | O_CREAT , S_IRWXU | S_IRWXG | S_IRWXO ); + if( lockFile <= 0 ) { + uasserted( 10309 , str::stream() << "Unable to create/open lock file: " << name << ' ' << errnoWithDescription() << " Is a mongod instance already running?" ); + } + if (flock( lockFile, LOCK_EX | LOCK_NB ) != 0) { + close ( lockFile ); + lockFile = 0; + uassert( 10310 , "Unable to lock file: " + name + ". Is a mongod instance already running?", 0 ); + } +#endif + + if ( oldFile ) { + // we check this here because we want to see if we can get the lock + // if we can't, then its probably just another mongod running + + string errmsg; + if (cmdLine.dur) { + if (!dur::haveJournalFiles()) { + + vector<string> dbnames; + getDatabaseNames( dbnames ); + + if ( dbnames.size() == 0 ) { + // this means that mongod crashed + // between initial startup and when journaling was initialized + // it is safe to continue + } + else { + errmsg = str::stream() + << "************** \n" + << "old lock file: " << name << ". probably means unclean shutdown,\n" + << "but there are no journal files to recover.\n" + << "this is likely human error or filesystem corruption.\n" + << "found " << dbnames.size() << " dbs.\n" + << "see: http://dochub.mongodb.org/core/repair for more information\n" + << "*************"; + } + + + } + } + else { + if (!dur::haveJournalFiles() && !doingRepair) { + errmsg = str::stream() + << "************** \n" + << "Unclean shutdown detected.\n" + << "Please visit http://dochub.mongodb.org/core/repair for recovery instructions.\n" + << "*************"; + } + } + + if (!errmsg.empty()) { + cout << errmsg << endl; +#ifdef _WIN32 + CloseHandle( lockFileHandle ); +#else + close ( lockFile ); +#endif + lockFile = 0; + uassert( 12596 , "old lock file" , 0 ); + } + } + + // Not related to lock file, but this is where we handle unclean shutdown + if( !cmdLine.dur && dur::haveJournalFiles() ) { + cout << "**************" << endl; + cout << "Error: journal files are present in journal directory, yet starting without journaling enabled." << endl; + cout << "It is recommended that you start with journaling enabled so that recovery may occur." << endl; + cout << "**************" << endl; + uasserted(13597, "can't start without --journal enabled when journal/ files are present"); + } + +#ifdef _WIN32 + uassert( 13625, "Unable to truncate lock file", _chsize(lockFile, 0) == 0); + writePid( lockFile ); + _commit( lockFile ); +#else + uassert( 13342, "Unable to truncate lock file", ftruncate(lockFile, 0) == 0); + writePid( lockFile ); + fsync( lockFile ); + flushMyDirectory(name); +#endif + } +#else + void acquirePathLock(bool) { + // TODO - this is very bad that the code above not running here. + + // Not related to lock file, but this is where we handle unclean shutdown + if( !cmdLine.dur && dur::haveJournalFiles() ) { + cout << "**************" << endl; + cout << "Error: journal files are present in journal directory, yet starting without --journal enabled." << endl; + cout << "It is recommended that you start with journaling enabled so that recovery may occur." << endl; + cout << "Alternatively (not recommended), you can backup everything, then delete the journal files, and run --repair" << endl; + cout << "**************" << endl; + uasserted(13618, "can't start without --journal enabled when journal/ files are present"); + } + } +#endif + +} // namespace mongo |