summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDwight <dmerriman@gmail.com>2008-09-11 15:13:47 -0400
committerDwight <dmerriman@gmail.com>2008-09-11 15:13:47 -0400
commitf56113d7367535eaf7994d7954bdfb5e8fc93dcb (patch)
tree2efe10e1676f5b68cbdbf446a61d4778969ba08f
parent87ba097b356b8cde41a7f6b4c44505fe564209a4 (diff)
downloadmongo-f56113d7367535eaf7994d7954bdfb5e8fc93dcb.tar.gz
arbitrate
-rw-r--r--db/db.cpp37
-rw-r--r--db/pdfile.cpp8
-rw-r--r--db/repl.cpp76
-rw-r--r--db/repl.h5
-rw-r--r--db/replset.h27
-rw-r--r--stdafx.cpp4
-rw-r--r--stdafx.h9
7 files changed, 118 insertions, 48 deletions
diff --git a/db/db.cpp b/db/db.cpp
index a327d8fbe45..c2b6b62fd61 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -45,7 +45,7 @@ int dbLocked = 0;
void closeAllSockets();
void startReplication();
-void pairWith(const char *remoteEnd);
+void pairWith(const char *remoteEnd, const char *arb);
struct MyStartupTests {
MyStartupTests() {
@@ -253,7 +253,7 @@ void receivedQuery(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, *
try {
msgdata = runQuery(m, ns, ntoskip, ntoreturn, query, fields, ss, m.data->dataAsInt());
}
- catch( AssertionException e ) {
+ catch( AssertionException& e ) {
ss << " exception ";
problem() << " Caught Assertion in runQuery ns:" << ns << ' ' << e.toString() << '\n';
log() << " ntoskip:" << ntoskip << " ntoreturn:" << ntoreturn << '\n';
@@ -311,8 +311,8 @@ void receivedGetMore(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort,
try {
msgdata = getMore(ns, ntoreturn, cursorid);
}
- catch( AssertionException ) {
- ss << " exception ";
+ catch( AssertionException& e ) {
+ ss << " exception " + e.toString();
msgdata = emptyMoreResult(cursorid);
}
Message *resp = new Message();
@@ -478,7 +478,7 @@ void jniCallback(Message& m, Message& out)
ss << "killcursors ";
receivedKillCursors(m);
}
- catch( AssertionException ) {
+ catch( AssertionException& ) {
problem() << "Caught Assertion in kill cursors, continuing" << endl;
ss << " exception ";
}
@@ -503,7 +503,7 @@ void jniCallback(Message& m, Message& out)
}
}
- catch( AssertionException ) {
+ catch( AssertionException& ) {
problem() << "Caught AssertionException in jniCall()" << endl;
}
@@ -603,9 +603,9 @@ void connThread()
ss << "insert ";
receivedInsert(m, ss);
}
- catch( AssertionException ) {
- problem() << " Caught Assertion insert, continuing" << endl;
- ss << " exception ";
+ catch( AssertionException& e ) {
+ problem() << " Caught Assertion insert, continuing\n";
+ ss << " exception " + e.toString();
}
}
else if( m.data->operation() == dbUpdate ) {
@@ -614,9 +614,9 @@ void connThread()
ss << "update ";
receivedUpdate(m, ss);
}
- catch( AssertionException ) {
+ catch( AssertionException& e ) {
problem() << " Caught Assertion update, continuing" << endl;
- ss << " exception ";
+ ss << " exception " + e.toString();
}
}
else if( m.data->operation() == dbDelete ) {
@@ -625,9 +625,9 @@ void connThread()
ss << "remove ";
receivedDelete(m);
}
- catch( AssertionException ) {
+ catch( AssertionException& e ) {
problem() << " Caught Assertion receivedDelete, continuing" << endl;
- ss << " exception ";
+ ss << " exception " + e.toString();
}
}
else if( m.data->operation() == dbGetMore ) {
@@ -643,9 +643,9 @@ void connThread()
ss << "killcursors ";
receivedKillCursors(m);
}
- catch( AssertionException ) {
+ catch( AssertionException& e ) {
problem() << " Caught Assertion in kill cursors, continuing" << endl;
- ss << " exception ";
+ ss << " exception " + e.toString();
}
}
else {
@@ -674,7 +674,7 @@ void connThread()
}
}
- catch( AssertionException ) {
+ catch( AssertionException& ) {
problem() << "Uncaught AssertionException, terminating" << endl;
exit(15);
}
@@ -911,7 +911,8 @@ int main(int argc, char* argv[], char *envp[] )
else if( s == "--slave" )
slave = true;
else if( s == "--pairwith" ) {
- pairWith( argv[++i] );
+ pairWith( argv[i+1], argv[i+2] );
+ i += 2;
}
else if( s == "--dbpath" )
dbpath = argv[++i];
@@ -947,7 +948,7 @@ int main(int argc, char* argv[], char *envp[] )
cout << " --port <portno> --dbpath <root> --appsrvpath <root of appsrv>" << endl;
cout << " --nocursors --nojni" << endl;
cout << " --oplog<n> 0=off 1=W 2=R 3=both 7=W+some reads" << endl;
- cout << " --pairwith <server:port>" << endl;
+ cout << " --pairwith <server:port> <arbiter>" << endl;
cout << endl;
return 0;
diff --git a/db/pdfile.cpp b/db/pdfile.cpp
index 18340ab8853..9180e6fabff 100644
--- a/db/pdfile.cpp
+++ b/db/pdfile.cpp
@@ -442,7 +442,7 @@ void _unindexRecord(const char *ns, IndexDetails& id, JSObj& obj, const DiskLoc&
try {
ok = id.head.btree()->unindex(id.head, id, j, dl);
}
- catch(AssertionException) {
+ catch(AssertionException&) {
problem() << "Assertion failure: _unindex failed " << id.indexNamespace() << endl;
cout << "Assertion failure: _unindex failed" << '\n';
cout << " obj:" << obj.toString() << '\n';
@@ -590,7 +590,7 @@ void DataFileMgr::update(
try {
idx.head.btree()->unindex(idx.head, idx, *removed[i], dl);
}
- catch(AssertionException) {
+ catch(AssertionException&) {
ss << " exception update unindex ";
problem() << " caught assertion update unindex " << idxns.c_str() << endl;
}
@@ -604,7 +604,7 @@ void DataFileMgr::update(
idx.head,
dl, *added[i], false, idx, true);
}
- catch(AssertionException) {
+ catch(AssertionException&) {
ss << " exception update index ";
cout << " caught assertion update index " << idxns.c_str() << '\n';
problem() << " caught assertion update index " << idxns.c_str() << endl;
@@ -644,7 +644,7 @@ void _indexRecord(IndexDetails& idx, JSObj& obj, DiskLoc newRecordLoc) {
idx.head.btree()->insert(idx.head, newRecordLoc,
(JSObj&) *i, false, idx, true);
}
- catch(AssertionException) {
+ catch(AssertionException&) {
problem() << " caught assertion _indexRecord " << idx.indexNamespace() << endl;
}
}
diff --git a/db/repl.cpp b/db/repl.cpp
index 79a594e0b7b..dd0b411e659 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -45,21 +45,52 @@ void ensureHaveIdIndex(const char *ns);
#include "replset.h"
+/* --- ReplPair -------------------------------- */
+
ReplPair *replPair = 0;
+JSObj ismasterobj = fromjson("{ismaster:1}");
+
+/* peer unreachable, try our arbitrator */
+void ReplPair::arbitrate() {
+ if( arbHost == "-" ) {
+ // no arbiter. we are up, let's assume he is down and network is not partitioned.
+ setMaster(State_Master, "remote unreachable");
+ return;
+ }
+
+ auto_ptr<DBClientConnection> conn( new DBClientConnection() );
+ string errmsg;
+ if( !conn->connect(arbHost.c_str(), errmsg) ) {
+ setMaster(State_CantArb, "can't connect to arb");
+ return;
+ }
+
+ JSObj res = conn->findOne("admin.$cmd", ismasterobj);
+ if( res.isEmpty() ) {
+ setMaster(State_CantArb, "can't arb 2");
+ return;
+ }
+
+ setMaster(State_Master, "remote down, arbitrator reached");
+}
+
+/* --------------------------------------------- */
+
class CmdIsMaster : public Command {
public:
CmdIsMaster() : Command("ismaster") { }
virtual bool run(const char *ns, JSObj& cmdObj, string& errmsg, JSObjBuilder& result) {
- int x = -2;
if( replPair ) {
- x = replPair->state;
- result.append("ismaster", x);
+ int x = replPair->state;
+ result.append("ismaster", replPair->state);
result.append("remote", replPair->remote);
+ if( !replPair->info.empty() )
+ result.append("info", replPair->info);
}
else {
- result.append("ismaster", x);
+ result.append("ismaster", 1);
result.append("msg", "not paired");
}
@@ -137,6 +168,7 @@ void ReplPair::negotiate(DBClientConnection *conn) {
JSObj res = conn->findOne("admin.$cmd", cmd);
if( res.getIntField("ok") != 1 ) {
problem() << "negotiate fails: " << res.toString() << '\n';
+ setMaster(State_Confused);
return;
}
int x = res.getIntField("you_are");
@@ -422,7 +454,7 @@ void ReplSource::sync_pullOpLog_applyOperation(JSObj& op) {
_runCommands(ns, o, ss, bb, ob);
}
}
- catch( UserAssertionException e ) {
+ catch( UserAssertionException& e ) {
log() << "sync: caught user assertion " << e.msg << '\n';
}
client = 0;
@@ -555,12 +587,12 @@ bool ReplSource::sync() {
conn = auto_ptr<DBClientConnection>(new DBClientConnection());
string errmsg;
if( !conn->connect(hostName.c_str(), errmsg) ) {
+ resetConnection();
+ log() << "pull: cantconn " << errmsg << endl;
if( replPair && paired ) {
assert( startsWith(hostName.c_str(), replPair->remoteHost.c_str()) );
- replPair->setMaster(1);
+ replPair->arbitrate();
}
- resetConnection();
- log() << "pull: cantconn " << errmsg << endl;
sleepsecs(1);
return false;
}
@@ -658,6 +690,7 @@ _ reuse that cursor when we can
void replMain() {
vector<ReplSource*> sources;
+ bool first = true;
while( 1 ) {
{
@@ -665,8 +698,10 @@ void replMain() {
ReplSource::loadAll(sources);
}
- if( sources.empty() )
+ if( !first && sources.empty() )
sleepsecs(20);
+
+ first=false;
for( vector<ReplSource*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
ReplSource *s = *i;
@@ -674,13 +709,18 @@ void replMain() {
try {
ok = s->sync();
}
- catch( SyncException ) {
- log() << "caught SyncException, sleeping 1 minutes" << endl;
- sleepsecs(60);
+ catch( SyncException& ) {
+ log() << "caught SyncException, sleeping 10 secs" << endl;
+ sleepsecs(10);
}
- catch( AssertionException ) {
- log() << "replMain caught AssertionException, sleeping 1 minutes" << endl;
- sleepsecs(60);
+ catch( AssertionException& e ) {
+ if( e.severe() ) {
+ log() << "replMain caught AssertionException, sleeping 1 minutes" << endl;
+ sleepsecs(60);
+ }
+ else {
+ log() << e.toString() << '\n';
+ }
}
if( !ok )
s->resetConnection();
@@ -703,7 +743,7 @@ void replSlaveThread() {
break;
sleepsecs(5);
}
- catch( AssertionException ) {
+ catch( AssertionException& ) {
problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
sleepsecs(300);
}
@@ -779,6 +819,6 @@ void startReplication() {
}
/* called from main at server startup */
-void pairWith(const char *remoteEnd) {
- replPair = new ReplPair(remoteEnd);
+void pairWith(const char *remoteEnd, const char *arb) {
+ replPair = new ReplPair(remoteEnd, arb);
}
diff --git a/db/repl.h b/db/repl.h
index 43c52c3232d..221c7c57b9e 100644
--- a/db/repl.h
+++ b/db/repl.h
@@ -118,7 +118,10 @@ public:
ReplSource(JSObj);
bool sync();
void save(); // write ourself to local.sources
- void resetConnection() { conn = auto_ptr<DBClientConnection>(0); }
+ void resetConnection() {
+ conn = auto_ptr<DBClientConnection>(0);
+ cursor = auto_ptr<DBClientCursor>(0);
+ }
// make a jsobj from our member fields of the form
// { host: ..., source: ..., syncedTo: ... }
diff --git a/db/replset.h b/db/replset.h
index 98216d7530b..c82655a9825 100644
--- a/db/replset.h
+++ b/db/replset.h
@@ -30,15 +30,24 @@ extern int port;
*/
class ReplPair {
-
public:
+ enum {
+ State_CantArb = -3,
+ State_Confused = -2,
+ State_Negotiating = -1,
+ State_Slave = 0,
+ State_Master = 1
+ };
+
int state;
+ string info; // commentary about our current state
+ string arbHost; // "-" for no arbiter. "host[:port]"
int remotePort;
string remoteHost;
string remote; // host:port if port specified.
int date; // -1 not yet set; 0=slave; 1=master
- ReplPair(const char *remoteEnd);
+ ReplPair(const char *remoteEnd, const char *arbiter);
bool dominant(const string& myname) {
if( myname == remoteHost )
@@ -46,25 +55,30 @@ public:
return myname > remoteHost;
}
- void setMaster(int n) {
+ void setMaster(int n, const char *_comment = "") {
+ info = _comment;
if( n == state )
return;
log() << "pair: setting master=" << n << " was " << state << '\n';
state = n;
}
+ /* negotiate with our peer who is master */
void negotiate(DBClientConnection *conn);
+
+ /* peer unreachable, try our arbitrator */
+ void arbitrate();
};
extern ReplPair *replPair;
/* we should not allow most operations when not the master */
inline bool isMaster() {
- return replPair == 0 || replPair->state == 1 ||
+ return replPair == 0 || replPair->state == ReplPair::State_Master ||
client->name == "local"; // local is always allowed
}
-inline ReplPair::ReplPair(const char *remoteEnd) {
+inline ReplPair::ReplPair(const char *remoteEnd, const char *arb) {
state = -1;
remote = remoteEnd;
remotePort = DBPort;
@@ -77,4 +91,7 @@ inline ReplPair::ReplPair(const char *remoteEnd) {
if( remotePort == DBPort )
remote = remoteHost; // don't include ":27017" as it is default; in case ran in diff ways over time to normalizke the hostname format in sources collection
}
+
+ arbHost = arb;
+ uassert("arbiter parm is empty", !arbHost.empty());
}
diff --git a/stdafx.cpp b/stdafx.cpp
index 5fb9dcded68..f37f5a0ea7a 100644
--- a/stdafx.cpp
+++ b/stdafx.cpp
@@ -54,6 +54,6 @@ void uasserted(const char *msg) {
}
void msgasserted(const char *msg) {
- cout << "Assertion: " << msg << '\n';
- throw AssertionException();
+ log() << "Assertion: " << msg << '\n';
+ throw MsgAssertionException(msg);
}
diff --git a/stdafx.h b/stdafx.h
index 51e5c8c003a..bb7d5a9e702 100644
--- a/stdafx.h
+++ b/stdafx.h
@@ -55,6 +55,7 @@ class AssertionException {
public:
string msg;
AssertionException() { }
+ virtual bool severe() { return true; }
virtual bool isUserAssertion() { return false; }
virtual string toString() { return msg; }
};
@@ -63,10 +64,18 @@ public:
class UserAssertionException : public AssertionException {
public:
UserAssertionException(const char *_msg) { msg = _msg; }
+ virtual bool severe() { return false; }
virtual bool isUserAssertion() { return true; }
virtual string toString() { return "userassert:" + msg; }
};
+class MsgAssertionException : public AssertionException {
+public:
+ MsgAssertionException(const char *_msg) { msg = _msg; }
+ virtual bool severe() { return false; }
+ virtual string toString() { return "massert:" + msg; }
+};
+
void asserted(const char *msg, const char *file, unsigned line);
void wasserted(const char *msg, const char *file, unsigned line);
void uasserted(const char *msg);