diff options
author | Dwight <dmerriman@gmail.com> | 2008-09-11 15:13:47 -0400 |
---|---|---|
committer | Dwight <dmerriman@gmail.com> | 2008-09-11 15:13:47 -0400 |
commit | f56113d7367535eaf7994d7954bdfb5e8fc93dcb (patch) | |
tree | 2efe10e1676f5b68cbdbf446a61d4778969ba08f | |
parent | 87ba097b356b8cde41a7f6b4c44505fe564209a4 (diff) | |
download | mongo-f56113d7367535eaf7994d7954bdfb5e8fc93dcb.tar.gz |
arbitrate
-rw-r--r-- | db/db.cpp | 37 | ||||
-rw-r--r-- | db/pdfile.cpp | 8 | ||||
-rw-r--r-- | db/repl.cpp | 76 | ||||
-rw-r--r-- | db/repl.h | 5 | ||||
-rw-r--r-- | db/replset.h | 27 | ||||
-rw-r--r-- | stdafx.cpp | 4 | ||||
-rw-r--r-- | stdafx.h | 9 |
7 files changed, 118 insertions, 48 deletions
diff --git a/db/db.cpp b/db/db.cpp index a327d8fbe45..c2b6b62fd61 100644 --- a/db/db.cpp +++ b/db/db.cpp @@ -45,7 +45,7 @@ int dbLocked = 0; void closeAllSockets(); void startReplication(); -void pairWith(const char *remoteEnd); +void pairWith(const char *remoteEnd, const char *arb); struct MyStartupTests { MyStartupTests() { @@ -253,7 +253,7 @@ void receivedQuery(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, * try { msgdata = runQuery(m, ns, ntoskip, ntoreturn, query, fields, ss, m.data->dataAsInt()); } - catch( AssertionException e ) { + catch( AssertionException& e ) { ss << " exception "; problem() << " Caught Assertion in runQuery ns:" << ns << ' ' << e.toString() << '\n'; log() << " ntoskip:" << ntoskip << " ntoreturn:" << ntoreturn << '\n'; @@ -311,8 +311,8 @@ void receivedGetMore(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, try { msgdata = getMore(ns, ntoreturn, cursorid); } - catch( AssertionException ) { - ss << " exception "; + catch( AssertionException& e ) { + ss << " exception " + e.toString(); msgdata = emptyMoreResult(cursorid); } Message *resp = new Message(); @@ -478,7 +478,7 @@ void jniCallback(Message& m, Message& out) ss << "killcursors "; receivedKillCursors(m); } - catch( AssertionException ) { + catch( AssertionException& ) { problem() << "Caught Assertion in kill cursors, continuing" << endl; ss << " exception "; } @@ -503,7 +503,7 @@ void jniCallback(Message& m, Message& out) } } - catch( AssertionException ) { + catch( AssertionException& ) { problem() << "Caught AssertionException in jniCall()" << endl; } @@ -603,9 +603,9 @@ void connThread() ss << "insert "; receivedInsert(m, ss); } - catch( AssertionException ) { - problem() << " Caught Assertion insert, continuing" << endl; - ss << " exception "; + catch( AssertionException& e ) { + problem() << " Caught Assertion insert, continuing\n"; + ss << " exception " + e.toString(); } } else if( m.data->operation() == dbUpdate ) { @@ -614,9 +614,9 @@ void connThread() ss << "update "; receivedUpdate(m, ss); } - catch( AssertionException ) { + catch( AssertionException& e ) { problem() << " Caught Assertion update, continuing" << endl; - ss << " exception "; + ss << " exception " + e.toString(); } } else if( m.data->operation() == dbDelete ) { @@ -625,9 +625,9 @@ void connThread() ss << "remove "; receivedDelete(m); } - catch( AssertionException ) { + catch( AssertionException& e ) { problem() << " Caught Assertion receivedDelete, continuing" << endl; - ss << " exception "; + ss << " exception " + e.toString(); } } else if( m.data->operation() == dbGetMore ) { @@ -643,9 +643,9 @@ void connThread() ss << "killcursors "; receivedKillCursors(m); } - catch( AssertionException ) { + catch( AssertionException& e ) { problem() << " Caught Assertion in kill cursors, continuing" << endl; - ss << " exception "; + ss << " exception " + e.toString(); } } else { @@ -674,7 +674,7 @@ void connThread() } } - catch( AssertionException ) { + catch( AssertionException& ) { problem() << "Uncaught AssertionException, terminating" << endl; exit(15); } @@ -911,7 +911,8 @@ int main(int argc, char* argv[], char *envp[] ) else if( s == "--slave" ) slave = true; else if( s == "--pairwith" ) { - pairWith( argv[++i] ); + pairWith( argv[i+1], argv[i+2] ); + i += 2; } else if( s == "--dbpath" ) dbpath = argv[++i]; @@ -947,7 +948,7 @@ int main(int argc, char* argv[], char *envp[] ) cout << " --port <portno> --dbpath <root> --appsrvpath <root of appsrv>" << endl; cout << " --nocursors --nojni" << endl; cout << " --oplog<n> 0=off 1=W 2=R 3=both 7=W+some reads" << endl; - cout << " --pairwith <server:port>" << endl; + cout << " --pairwith <server:port> <arbiter>" << endl; cout << endl; return 0; diff --git a/db/pdfile.cpp b/db/pdfile.cpp index 18340ab8853..9180e6fabff 100644 --- a/db/pdfile.cpp +++ b/db/pdfile.cpp @@ -442,7 +442,7 @@ void _unindexRecord(const char *ns, IndexDetails& id, JSObj& obj, const DiskLoc& try { ok = id.head.btree()->unindex(id.head, id, j, dl); } - catch(AssertionException) { + catch(AssertionException&) { problem() << "Assertion failure: _unindex failed " << id.indexNamespace() << endl; cout << "Assertion failure: _unindex failed" << '\n'; cout << " obj:" << obj.toString() << '\n'; @@ -590,7 +590,7 @@ void DataFileMgr::update( try { idx.head.btree()->unindex(idx.head, idx, *removed[i], dl); } - catch(AssertionException) { + catch(AssertionException&) { ss << " exception update unindex "; problem() << " caught assertion update unindex " << idxns.c_str() << endl; } @@ -604,7 +604,7 @@ void DataFileMgr::update( idx.head, dl, *added[i], false, idx, true); } - catch(AssertionException) { + catch(AssertionException&) { ss << " exception update index "; cout << " caught assertion update index " << idxns.c_str() << '\n'; problem() << " caught assertion update index " << idxns.c_str() << endl; @@ -644,7 +644,7 @@ void _indexRecord(IndexDetails& idx, JSObj& obj, DiskLoc newRecordLoc) { idx.head.btree()->insert(idx.head, newRecordLoc, (JSObj&) *i, false, idx, true); } - catch(AssertionException) { + catch(AssertionException&) { problem() << " caught assertion _indexRecord " << idx.indexNamespace() << endl; } } diff --git a/db/repl.cpp b/db/repl.cpp index 79a594e0b7b..dd0b411e659 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -45,21 +45,52 @@ void ensureHaveIdIndex(const char *ns); #include "replset.h" +/* --- ReplPair -------------------------------- */ + ReplPair *replPair = 0; +JSObj ismasterobj = fromjson("{ismaster:1}"); + +/* peer unreachable, try our arbitrator */ +void ReplPair::arbitrate() { + if( arbHost == "-" ) { + // no arbiter. we are up, let's assume he is down and network is not partitioned. + setMaster(State_Master, "remote unreachable"); + return; + } + + auto_ptr<DBClientConnection> conn( new DBClientConnection() ); + string errmsg; + if( !conn->connect(arbHost.c_str(), errmsg) ) { + setMaster(State_CantArb, "can't connect to arb"); + return; + } + + JSObj res = conn->findOne("admin.$cmd", ismasterobj); + if( res.isEmpty() ) { + setMaster(State_CantArb, "can't arb 2"); + return; + } + + setMaster(State_Master, "remote down, arbitrator reached"); +} + +/* --------------------------------------------- */ + class CmdIsMaster : public Command { public: CmdIsMaster() : Command("ismaster") { } virtual bool run(const char *ns, JSObj& cmdObj, string& errmsg, JSObjBuilder& result) { - int x = -2; if( replPair ) { - x = replPair->state; - result.append("ismaster", x); + int x = replPair->state; + result.append("ismaster", replPair->state); result.append("remote", replPair->remote); + if( !replPair->info.empty() ) + result.append("info", replPair->info); } else { - result.append("ismaster", x); + result.append("ismaster", 1); result.append("msg", "not paired"); } @@ -137,6 +168,7 @@ void ReplPair::negotiate(DBClientConnection *conn) { JSObj res = conn->findOne("admin.$cmd", cmd); if( res.getIntField("ok") != 1 ) { problem() << "negotiate fails: " << res.toString() << '\n'; + setMaster(State_Confused); return; } int x = res.getIntField("you_are"); @@ -422,7 +454,7 @@ void ReplSource::sync_pullOpLog_applyOperation(JSObj& op) { _runCommands(ns, o, ss, bb, ob); } } - catch( UserAssertionException e ) { + catch( UserAssertionException& e ) { log() << "sync: caught user assertion " << e.msg << '\n'; } client = 0; @@ -555,12 +587,12 @@ bool ReplSource::sync() { conn = auto_ptr<DBClientConnection>(new DBClientConnection()); string errmsg; if( !conn->connect(hostName.c_str(), errmsg) ) { + resetConnection(); + log() << "pull: cantconn " << errmsg << endl; if( replPair && paired ) { assert( startsWith(hostName.c_str(), replPair->remoteHost.c_str()) ); - replPair->setMaster(1); + replPair->arbitrate(); } - resetConnection(); - log() << "pull: cantconn " << errmsg << endl; sleepsecs(1); return false; } @@ -658,6 +690,7 @@ _ reuse that cursor when we can void replMain() { vector<ReplSource*> sources; + bool first = true; while( 1 ) { { @@ -665,8 +698,10 @@ void replMain() { ReplSource::loadAll(sources); } - if( sources.empty() ) + if( !first && sources.empty() ) sleepsecs(20); + + first=false; for( vector<ReplSource*>::iterator i = sources.begin(); i != sources.end(); i++ ) { ReplSource *s = *i; @@ -674,13 +709,18 @@ void replMain() { try { ok = s->sync(); } - catch( SyncException ) { - log() << "caught SyncException, sleeping 1 minutes" << endl; - sleepsecs(60); + catch( SyncException& ) { + log() << "caught SyncException, sleeping 10 secs" << endl; + sleepsecs(10); } - catch( AssertionException ) { - log() << "replMain caught AssertionException, sleeping 1 minutes" << endl; - sleepsecs(60); + catch( AssertionException& e ) { + if( e.severe() ) { + log() << "replMain caught AssertionException, sleeping 1 minutes" << endl; + sleepsecs(60); + } + else { + log() << e.toString() << '\n'; + } } if( !ok ) s->resetConnection(); @@ -703,7 +743,7 @@ void replSlaveThread() { break; sleepsecs(5); } - catch( AssertionException ) { + catch( AssertionException& ) { problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl; sleepsecs(300); } @@ -779,6 +819,6 @@ void startReplication() { } /* called from main at server startup */ -void pairWith(const char *remoteEnd) { - replPair = new ReplPair(remoteEnd); +void pairWith(const char *remoteEnd, const char *arb) { + replPair = new ReplPair(remoteEnd, arb); } diff --git a/db/repl.h b/db/repl.h index 43c52c3232d..221c7c57b9e 100644 --- a/db/repl.h +++ b/db/repl.h @@ -118,7 +118,10 @@ public: ReplSource(JSObj); bool sync(); void save(); // write ourself to local.sources - void resetConnection() { conn = auto_ptr<DBClientConnection>(0); } + void resetConnection() { + conn = auto_ptr<DBClientConnection>(0); + cursor = auto_ptr<DBClientCursor>(0); + } // make a jsobj from our member fields of the form // { host: ..., source: ..., syncedTo: ... } diff --git a/db/replset.h b/db/replset.h index 98216d7530b..c82655a9825 100644 --- a/db/replset.h +++ b/db/replset.h @@ -30,15 +30,24 @@ extern int port; */ class ReplPair { - public: + enum { + State_CantArb = -3, + State_Confused = -2, + State_Negotiating = -1, + State_Slave = 0, + State_Master = 1 + }; + int state; + string info; // commentary about our current state + string arbHost; // "-" for no arbiter. "host[:port]" int remotePort; string remoteHost; string remote; // host:port if port specified. int date; // -1 not yet set; 0=slave; 1=master - ReplPair(const char *remoteEnd); + ReplPair(const char *remoteEnd, const char *arbiter); bool dominant(const string& myname) { if( myname == remoteHost ) @@ -46,25 +55,30 @@ public: return myname > remoteHost; } - void setMaster(int n) { + void setMaster(int n, const char *_comment = "") { + info = _comment; if( n == state ) return; log() << "pair: setting master=" << n << " was " << state << '\n'; state = n; } + /* negotiate with our peer who is master */ void negotiate(DBClientConnection *conn); + + /* peer unreachable, try our arbitrator */ + void arbitrate(); }; extern ReplPair *replPair; /* we should not allow most operations when not the master */ inline bool isMaster() { - return replPair == 0 || replPair->state == 1 || + return replPair == 0 || replPair->state == ReplPair::State_Master || client->name == "local"; // local is always allowed } -inline ReplPair::ReplPair(const char *remoteEnd) { +inline ReplPair::ReplPair(const char *remoteEnd, const char *arb) { state = -1; remote = remoteEnd; remotePort = DBPort; @@ -77,4 +91,7 @@ inline ReplPair::ReplPair(const char *remoteEnd) { if( remotePort == DBPort ) remote = remoteHost; // don't include ":27017" as it is default; in case ran in diff ways over time to normalizke the hostname format in sources collection } + + arbHost = arb; + uassert("arbiter parm is empty", !arbHost.empty()); } diff --git a/stdafx.cpp b/stdafx.cpp index 5fb9dcded68..f37f5a0ea7a 100644 --- a/stdafx.cpp +++ b/stdafx.cpp @@ -54,6 +54,6 @@ void uasserted(const char *msg) { } void msgasserted(const char *msg) { - cout << "Assertion: " << msg << '\n'; - throw AssertionException(); + log() << "Assertion: " << msg << '\n'; + throw MsgAssertionException(msg); } @@ -55,6 +55,7 @@ class AssertionException { public: string msg; AssertionException() { } + virtual bool severe() { return true; } virtual bool isUserAssertion() { return false; } virtual string toString() { return msg; } }; @@ -63,10 +64,18 @@ public: class UserAssertionException : public AssertionException { public: UserAssertionException(const char *_msg) { msg = _msg; } + virtual bool severe() { return false; } virtual bool isUserAssertion() { return true; } virtual string toString() { return "userassert:" + msg; } }; +class MsgAssertionException : public AssertionException { +public: + MsgAssertionException(const char *_msg) { msg = _msg; } + virtual bool severe() { return false; } + virtual string toString() { return "massert:" + msg; } +}; + void asserted(const char *msg, const char *file, unsigned line); void wasserted(const char *msg, const char *file, unsigned line); void uasserted(const char *msg); |