// instance.cpp : Global state variables and functions.
//
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
#include "stdafx.h"
#include "db.h"
#include "query.h"
#include "introspect.h"
#include "repl.h"
#include "dbmessage.h"
#include "instance.h"
#include "lasterror.h"
#include "security.h"
#include "json.h"
#include "reccache.h"
#include "replset.h"
#include "../s/d_logic.h"
#include "../util/file_allocator.h"
#include "cmdline.h"
#if !defined(_WIN32)
#include
#endif
#include "stats/counters.h"
#include "background.h"
namespace mongo {
void receivedKillCursors(Message& m);
void receivedUpdate(Message& m, CurOp& op);
void receivedDelete(Message& m, CurOp& op);
void receivedInsert(Message& m, CurOp& op);
bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop );
CmdLine cmdLine;
int nloggedsome = 0;
#define LOGSOME if( ++nloggedsome < 1000 || nloggedsome % 100 == 0 )
string dbExecCommand;
string bind_ip = "";
char *appsrvPath = null;
DiagLog _diaglog;
bool useCursors = true;
bool useHints = true;
void closeAllSockets();
void flushOpLog( stringstream &ss ) {
if( _diaglog.f && _diaglog.f->is_open() ) {
ss << "flushing op log and files\n";
_diaglog.flush();
}
}
int ctr = 0;
KillCurrentOp killCurrentOp;
int lockFile = 0;
// see FSyncCommand:
unsigned lockedForWriting;
boost::mutex lockedForWritingMutex;
bool unlockRequested = false;
void inProgCmd( Message &m, DbResponse &dbresponse ) {
BSONObjBuilder b;
if( ! cc().isAdmin() ){
BSONObjBuilder b;
b.append("err", "unauthorized");
}
else {
DbMessage d(m);
QueryMessage q(d);
bool all = q.query["$all"].trueValue();
vector vals;
{
Client& me = cc();
boostlock bl(Client::clientsMutex);
for( set::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) {
Client *c = *i;
if ( c == &me )
continue;
CurOp& co = *(c->curop());
if( all || co.active() )
vals.push_back( co.infoNoauth() );
}
}
b.append("inprog", vals);
unsigned x = lockedForWriting;
if( x ) {
b.append("fsyncLock", x);
b.append("info", "use command {unlock:0} to terminate the fsync write/snapshot lock");
}
}
replyToQuery(0, m, dbresponse, b.obj());
}
void killOp( Message &m, DbResponse &dbresponse ) {
BSONObj obj;
if( ! cc().isAdmin() ){
obj = fromjson("{\"err\":\"unauthorized\"}");
}
/*else if( !dbMutexInfo.isLocked() )
obj = fromjson("{\"info\":\"no op in progress/not locked\"}");
*/
else {
DbMessage d(m);
QueryMessage q(d);
BSONElement e = q.query.getField("op");
if( !e.isNumber() ) {
obj = fromjson("{\"err\":\"no op number field specified?\"}");
}
else {
obj = fromjson("{\"info\":\"attempting to kill op\"}");
killCurrentOp.kill( (unsigned) e.number() );
}
}
replyToQuery(0, m, dbresponse, obj);
}
void unlockFsync(const char *ns, Message& m, DbResponse &dbresponse) {
BSONObj obj;
if( ! cc().isAdmin() || strncmp(ns, "admin.", 6) != 0 ) {
obj = fromjson("{\"err\":\"unauthorized\"}");
}
else {
if( lockedForWriting ) {
log() << "command: unlock requested" << endl;
obj = fromjson("{ok:1,\"info\":\"unlock requested\"}");
unlockRequested = true;
}
else {
obj = fromjson("{ok:0,\"errmsg\":\"not locked\"}");
}
}
replyToQuery(0, m, dbresponse, obj);
}
static bool receivedQuery(Client& c, DbResponse& dbresponse, Message& m ){
bool ok = true;
MSGID responseTo = m.data->id;
DbMessage d(m);
QueryMessage q(d);
QueryResult* msgdata;
CurOp& op = *(c.curop());
try {
msgdata = runQuery(m, q, op ).release();
}
catch ( AssertionException& e ) {
ok = false;
op.debug().str << " exception ";
LOGSOME problem() << " Caught Assertion in runQuery ns:" << q.ns << ' ' << e.toString() << '\n';
log() << " ntoskip:" << q.ntoskip << " ntoreturn:" << q.ntoreturn << '\n';
if ( q.query.valid() )
log() << " query:" << q.query.toString() << endl;
else
log() << " query object is not valid!" << endl;
BSONObjBuilder err;
err.append("$err", e.msg.empty() ? "assertion during query" : e.msg);
BSONObj errObj = err.done();
BufBuilder b;
b.skip(sizeof(QueryResult));
b.append((void*) errObj.objdata(), errObj.objsize());
// todo: call replyToQuery() from here instead of this!!! see dbmessage.h
msgdata = (QueryResult *) b.buf();
b.decouple();
QueryResult *qr = msgdata;
qr->_resultFlags() = QueryResult::ResultFlag_ErrSet;
qr->len = b.len();
qr->setOperation(opReply);
qr->cursorId = 0;
qr->startingFrom = 0;
qr->nReturned = 1;
}
Message *resp = new Message();
resp->setData(msgdata, true); // transport will free
dbresponse.response = resp;
dbresponse.responseTo = responseTo;
if ( op.shouldDBProfile( 0 ) ){
op.debug().str << " bytes:" << resp->data->dataLen();
}
return ok;
}
// Returns false when request includes 'end'
bool assembleResponse( Message &m, DbResponse &dbresponse, const sockaddr_in &client ) {
// before we lock...
int op = m.data->operation();
bool isCommand = false;
const char *ns = m.data->_data + 4;
if ( op == dbQuery ) {
if( strstr(ns, ".$cmd") ) {
isCommand = true;
OPWRITE;
if( strstr(ns, ".$cmd.sys.") ) {
if( strstr(ns, "$cmd.sys.inprog") ) {
inProgCmd(m, dbresponse);
return true;
}
if( strstr(ns, "$cmd.sys.killop") ) {
killOp(m, dbresponse);
return true;
}
if( strstr(ns, "$cmd.sys.unlock") ) {
unlockFsync(ns, m, dbresponse);
return true;
}
}
}
else {
OPREAD;
}
}
else if( op == dbGetMore ) {
OPREAD;
}
else {
OPWRITE;
}
globalOpCounters.gotOp( op , isCommand );
if ( handlePossibleShardedMessage( m , dbresponse ) ){
/* important to do this before we lock
so if a message has to be forwarded, doesn't block for that
*/
return true;
}
Client& c = cc();
auto_ptr nestedOp;
CurOp* currentOpP = c.curop();
if ( currentOpP->active() ){
nestedOp.reset( new CurOp( &c , currentOpP ) );
currentOpP = nestedOp.get();
}
CurOp& currentOp = *currentOpP;
currentOp.reset(client,op);
OpDebug& debug = currentOp.debug();
StringBuilder& ss = debug.str;
ss << opToString( op ) << " ";
int logThreshold = cmdLine.slowMS;
bool log = logLevel >= 1;
if ( op == dbQuery ) {
if ( ! receivedQuery(c , dbresponse, m ) )
log = true;
}
else if ( op == dbGetMore ) {
DEV log = true;
if ( ! receivedGetMore(dbresponse, m, currentOp) )
log = true;
}
else if ( op == dbMsg ) {
// deprecated - replaced by commands
char *p = m.data->_data;
int len = strlen(p);
if ( len > 400 )
out() << curTimeMillis() % 10000 <<
" long msg received, len:" << len <<
" ends with: " << p + len - 10 << endl;
Message *resp = new Message();
if ( strcmp( "end" , p ) == 0 )
resp->setData( opReply , "dbMsg end no longer supported" );
else
resp->setData( opReply , "i am fine - dbMsg deprecated");
dbresponse.response = resp;
dbresponse.responseTo = m.data->id;
}
else {
const char *ns = m.data->_data + 4;
char cl[256];
nsToDatabase(ns, cl);
if( ! c.getAuthenticationInfo()->isAuthorized(cl) ) {
uassert_nothrow("unauthorized");
}
else {
try {
if ( op == dbInsert ) {
receivedInsert(m, currentOp);
}
else if ( op == dbUpdate ) {
receivedUpdate(m, currentOp);
}
else if ( op == dbDelete ) {
receivedDelete(m, currentOp);
}
else if ( op == dbKillCursors ) {
currentOp.ensureStarted();
logThreshold = 10;
ss << "killcursors ";
receivedKillCursors(m);
}
else {
out() << " operation isn't supported: " << op << endl;
currentOp.done();
log = true;
}
}
catch ( AssertionException& e ) {
problem() << " Caught Assertion in " << opToString(op) << " , continuing" << endl;
ss << " exception " + e.toString();
log = true;
}
}
}
currentOp.ensureStarted();
currentOp.done();
int ms = currentOp.totalTimeMillis();
log = log || (logLevel >= 2 && ++ctr % 512 == 0);
DEV log = true;
if ( log || ms > logThreshold ) {
ss << ' ' << ms << "ms";
mongo::log() << ss.str() << endl;
}
if ( currentOp.shouldDBProfile( ms ) ){
// performance profiling is on
if ( dbMutex.getState() < 0 ){
mongo::log(1) << "note: not profiling because recursive read lock" << endl;
}
else {
mongolock lk(true);
if ( dbHolder.isLoaded( nsToDatabase( currentOp.getNS() ) , dbpath ) ){
Client::Context c( currentOp.getNS() );
profile(ss.str().c_str(), ms);
}
else {
mongo::log() << "note: not profiling because db went away - probably a close on: " << currentOp.getNS() << endl;
}
}
}
return true;
} /* assembleResponse() */
void killCursors(int n, long long *ids);
void receivedKillCursors(Message& m) {
int *x = (int *) m.data->_data;
x++; // reserved
int n = *x++;
uassert( 13004 , "sent 0 cursors to kill" , n >= 1 );
if ( n > 2000 ) {
problem() << "Assertion failure, receivedKillCursors, n=" << n << endl;
assert( n < 30000 );
}
killCursors(n, (long long *) x);
}
/* db - database name
path - db directory
*/
void closeDatabase( const char *db, const string& path ) {
assertInWriteLock();
Client::Context * ctx = cc().getContext();
assert( ctx );
assert( ctx->inDB( db , path ) );
Database *database = ctx->db();
assert( database->name == db );
if( BackgroundOperation::inProgForDb(db) ) {
log() << "warning: bg op in prog during close db? " << db << endl;
}
/* important: kill all open cursors on the database */
string prefix(db);
prefix += '.';
ClientCursor::invalidate(prefix.c_str());
NamespaceDetailsTransient::clearForPrefix( prefix.c_str() );
dbHolder.erase( db, path );
delete database; // closes files
ctx->clear();
}
void receivedUpdate(Message& m, CurOp& op) {
DbMessage d(m);
const char *ns = d.getns();
assert(*ns);
uassert( 10054 , "not master", isMasterNs( ns ) );
op.debug().str << ns << ' ';
int flags = d.pullInt();
BSONObj query = d.nextJsObj();
assert( d.moreJSObjs() );
assert( query.objsize() < m.data->dataLen() );
BSONObj toupdate = d.nextJsObj();
uassert( 10055 , "update object too large", toupdate.objsize() <= MaxBSONObjectSize);
assert( toupdate.objsize() < m.data->dataLen() );
assert( query.objsize() + toupdate.objsize() < m.data->dataLen() );
bool upsert = flags & UpdateOption_Upsert;
bool multi = flags & UpdateOption_Multi;
{
string s = query.toString();
/* todo: we shouldn't do all this ss stuff when we don't need it, it will slow us down.
instead, let's just story the query BSON in the debug object, and it can toString()
lazily
*/
op.debug().str << " query: " << s;
op.setQuery(query);
}
mongolock lk(1);
Client::Context ctx( ns );
UpdateResult res = updateObjects(ns, toupdate, query, upsert, multi, true, op.debug() );
recordUpdate( res.existing , (int) res.num ); // for getlasterror
}
void receivedDelete(Message& m, CurOp& op) {
DbMessage d(m);
const char *ns = d.getns();
assert(*ns);
uassert( 10056 , "not master", isMasterNs( ns ) );
int flags = d.pullInt();
bool justOne = flags & 1;
assert( d.moreJSObjs() );
BSONObj pattern = d.nextJsObj();
{
string s = pattern.toString();
op.debug().str << " query: " << s;
op.setQuery(pattern);
}
writelock lk(ns);
Client::Context ctx(ns);
long long n = deleteObjects(ns, pattern, justOne, true);
recordDelete( (int) n );
}
QueryResult* emptyMoreResult(long long);
bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop ) {
StringBuilder& ss = curop.debug().str;
bool ok = true;
DbMessage d(m);
const char *ns = d.getns();
int ntoreturn = d.pullInt();
long long cursorid = d.pullInt64();
ss << ns << " cid:" << cursorid << " ntoreturn:" << ntoreturn;;
QueryResult* msgdata;
try {
mongolock lk(false);
Client::Context ctx(ns);
msgdata = getMore(ns, ntoreturn, cursorid, curop);
}
catch ( AssertionException& e ) {
ss << " exception " << e.toString();
msgdata = emptyMoreResult(cursorid);
ok = false;
}
Message *resp = new Message();
resp->setData(msgdata, true);
ss << " bytes:" << resp->data->dataLen();
ss << " nreturned:" << msgdata->nReturned;
dbresponse.response = resp;
dbresponse.responseTo = m.data->id;
return ok;
}
void receivedInsert(Message& m, CurOp& op) {
DbMessage d(m);
const char *ns = d.getns();
assert(*ns);
uassert( 10058 , "not master", isMasterNs( ns ) );
op.debug().str << ns;
writelock lk(ns);
Client::Context ctx(ns);
while ( d.moreJSObjs() ) {
BSONObj js = d.nextJsObj();
uassert( 10059 , "object to insert too large", js.objsize() <= MaxBSONObjectSize);
theDataFileMgr.insert(ns, js, false);
logOp("i", ns, js);
}
}
class JniMessagingPort : public AbstractMessagingPort {
public:
JniMessagingPort(Message& _container) : container(_container) { }
void reply(Message& received, Message& response, MSGID) {
container = response;
}
void reply(Message& received, Message& response) {
container = response;
}
unsigned remotePort(){
return 1;
}
Message & container;
};
void getDatabaseNames( vector< string > &names ) {
boost::filesystem::path path( dbpath );
for ( boost::filesystem::directory_iterator i( path );
i != boost::filesystem::directory_iterator(); ++i ) {
if ( directoryperdb ) {
boost::filesystem::path p = *i;
string dbName = p.leaf();
p /= ( dbName + ".ns" );
if ( boost::filesystem::exists( p ) )
names.push_back( dbName );
} else {
string fileName = boost::filesystem::path(*i).leaf();
if ( fileName.length() > 3 && fileName.substr( fileName.length() - 3, 3 ) == ".ns" )
names.push_back( fileName.substr( 0, fileName.length() - 3 ) );
}
}
}
bool DBDirectClient::call( Message &toSend, Message &response, bool assertOk ) {
if ( lastError._get() )
lastError.startRequest( toSend, lastError._get() );
DbResponse dbResponse;
assembleResponse( toSend, dbResponse );
assert( dbResponse.response );
response = *dbResponse.response;
return true;
}
void DBDirectClient::say( Message &toSend ) {
if ( lastError._get() )
lastError.startRequest( toSend, lastError._get() );
DbResponse dbResponse;
assembleResponse( toSend, dbResponse );
}
auto_ptr DBDirectClient::query(const string &ns, Query query, int nToReturn , int nToSkip ,
const BSONObj *fieldsToReturn , int queryOptions ){
//if ( ! query.obj.isEmpty() || nToReturn != 0 || nToSkip != 0 || fieldsToReturn || queryOptions )
return DBClientBase::query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions );
//
//assert( query.obj.isEmpty() );
//throw UserException( (string)"yay:" + ns );
}
DBClientBase * createDirectClient(){
return new DBDirectClient();
}
void recCacheCloseAll();
boost::mutex &exitMutex( *( new boost::mutex ) );
int numExitCalls = 0;
void shutdown();
bool inShutdown(){
return numExitCalls > 0;
}
void tryToOutputFatal( const string& s ){
try {
rawOut( s );
return;
}
catch ( ... ){}
try {
cerr << s << endl;
return;
}
catch ( ... ){}
// uh - oh, not sure there is anything else we can do...
}
/* not using log() herein in case we are already locked */
void dbexit( ExitCode rc, const char *why) {
Client * c = currentClient.get();
{
boostlock lk( exitMutex );
if ( numExitCalls++ > 0 ) {
if ( numExitCalls > 5 ){
// this means something horrible has happened
::_exit( rc );
}
stringstream ss;
ss << "dbexit: " << why << "; exiting immediately" << endl;
tryToOutputFatal( ss.str() );
if ( c ) c->shutdown();
::exit( rc );
}
}
stringstream ss;
ss << "dbexit: " << why << endl;
tryToOutputFatal( ss.str() );
try {
shutdown(); // gracefully shutdown instance
}
catch ( ... ){
tryToOutputFatal( "shutdown failed with exception" );
}
tryToOutputFatal( "dbexit: really exiting now\n" );
if ( c ) c->shutdown();
::exit(rc);
}
void shutdown() {
log() << "\t shutdown: going to close listening sockets..." << endl;
ListeningSockets::get()->closeAll();
log() << "\t shutdown: going to flush oplog..." << endl;
stringstream ss2;
flushOpLog( ss2 );
rawOut( ss2.str() );
/* must do this before unmapping mem or you may get a seg fault */
log() << "\t shutdown: going to close sockets..." << endl;
boost::thread close_socket_thread(closeAllSockets);
// wait until file preallocation finishes
// we would only hang here if the file_allocator code generates a
// synchronous signal, which we don't expect
log() << "\t shutdown: waiting for fs preallocator..." << endl;
theFileAllocator().waitUntilFinished();
log() << "\t shutdown: closing all files..." << endl;
stringstream ss3;
MemoryMappedFile::closeAllFiles( ss3 );
rawOut( ss3.str() );
// should we be locked here? we aren't. might be ok as-is.
recCacheCloseAll();
#if !defined(_WIN32) && !defined(__sunos__)
if ( lockFile ){
log() << "\t shutdown: removing fs lock..." << endl;
if( ftruncate( lockFile , 0 ) )
log() << "\t couldn't remove fs lock " << OUTPUT_ERRNO << endl;
flock( lockFile, LOCK_UN );
}
#endif
}
void acquirePathLock() {
#if !defined(_WIN32) && !defined(__sunos__)
string name = ( boost::filesystem::path( dbpath ) / "mongod.lock" ).native_file_string();
bool oldFile = false;
if ( boost::filesystem::exists( name ) && boost::filesystem::file_size( name ) > 0 ){
oldFile = true;
}
lockFile = open( name.c_str(), O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO );
massert( 10309 , "Unable to create / open lock file for dbpath: " + name, lockFile > 0 );
massert( 10310 , "Unable to acquire lock for dbpath: " + name, flock( lockFile, LOCK_EX | LOCK_NB ) == 0 );
if ( oldFile ){
// we check this here because we want to see if we can get the lock
// if we can't, then its probably just another mongod running
cout << "************** \n"
<< "old lock file: " << name << ". probably means unclean shutdown\n"
<< "reccomend removing file and running --repair\n"
<< "see: http://dochub.mongodb.org/core/repair for more information\n"
<< "*************" << endl;
uassert( 12596 , "old lock file" , 0 );
}
stringstream ss;
ss << getpid() << endl;
string s = ss.str();
const char * data = s.c_str();
assert( write( lockFile , data , strlen( data ) ) );
fsync( lockFile );
#endif
}
} // namespace mongo