summaryrefslogtreecommitdiff
path: root/db/repl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl.cpp')
-rw-r--r--db/repl.cpp1172
1 files changed, 597 insertions, 575 deletions
diff --git a/db/repl.cpp b/db/repl.cpp
index d3c928b98d3..636a1db36bb 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -9,16 +9,16 @@
/**
* Copyright (C) 2008 10gen Inc.
-*
+*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
-*
+*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
-*
+*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
@@ -31,7 +31,7 @@
{ haveLogged : true }
local.pair.startup - can contain a special value indicating for a pair that we have the master copy.
used when replacing other half of the pair which has permanently failed.
- local.pair.sync - { initialsynccomplete: 1 }
+ local.pair.sync - { initialsynccomplete: 1 }
*/
#include "stdafx.h"
@@ -57,44 +57,46 @@ void ensureHaveIdIndex(const char *ns);
/* if 1 sync() is running */
int syncing = 0;
-/* if true replace our peer in a replication pair -- don't worry about if his
+/* if true replace our peer in a replication pair -- don't worry about if his
local.oplog.$main is empty.
*/
bool replacePeer = false;
-/* "dead" means something really bad happened like replication falling completely out of sync.
+/* "dead" means something really bad happened like replication falling completely out of sync.
when non-null, we are dead and the string is informational
*/
const char *allDead = 0;
-/* This is set to true if we have EVER been up to date -- this way a new pair member
+/* This is set to true if we have EVER been up to date -- this way a new pair member
which is a replacement won't go online as master until we have initially fully synced.
*/
-class PairSync {
+class PairSync {
int initialsynccomplete;
public:
- PairSync() { initialsynccomplete = -1; }
+ PairSync() {
+ initialsynccomplete = -1;
+ }
- /* call before using the class. from dbmutex */
+ /* call before using the class. from dbmutex */
void init() {
BSONObj o;
initialsynccomplete = 0;
- if( getSingleton("local.pair.sync", o) )
+ if ( getSingleton("local.pair.sync", o) )
initialsynccomplete = 1;
}
- bool initialSyncCompleted() {
- return initialsynccomplete != 0;
+ bool initialSyncCompleted() {
+ return initialsynccomplete != 0;
}
- void setInitialSyncCompleted() {
+ void setInitialSyncCompleted() {
BSONObj o = fromjson("{initialsynccomplete:1}");
putSingleton("local.pair.sync", o);
initialsynccomplete = 1;
}
- void setInitialSyncCompletedLocking() {
- if( initialsynccomplete == 1 )
+ void setInitialSyncCompletedLocking() {
+ if ( initialsynccomplete == 1 )
return;
dblock lk;
BSONObj o = fromjson("{initialsynccomplete:1}");
@@ -102,12 +104,14 @@ public:
initialsynccomplete = 1;
}
} pairSync;
-bool getInitialSyncCompleted() { return pairSync.initialSyncCompleted(); }
+bool getInitialSyncCompleted() {
+ return pairSync.initialSyncCompleted();
+}
#include "replset.h"
#define debugrepl(z) cout << "debugrepl " << z << '\n'
-//define debugrepl
+//define debugrepl
/* --- ReplPair -------------------------------- */
@@ -115,25 +119,29 @@ ReplPair *replPair = 0;
/* output by the web console */
const char *replInfo = "";
-struct ReplInfo {
- ReplInfo(const char *msg) { replInfo = msg; }
- ~ReplInfo() { replInfo = "?"; }
+struct ReplInfo {
+ ReplInfo(const char *msg) {
+ replInfo = msg;
+ }
+ ~ReplInfo() {
+ replInfo = "?";
+ }
};
-void ReplPair::setMaster(int n, const char *_comment ) {
- if ( n == State_Master && !pairSync.initialSyncCompleted() )
- return;
- info = _comment;
- if( n != state && !quiet )
- log() << "pair: setting master=" << n << " was " << state << '\n';
- state = n;
+void ReplPair::setMaster(int n, const char *_comment ) {
+ if ( n == State_Master && !pairSync.initialSyncCompleted() )
+ return;
+ info = _comment;
+ if ( n != state && !quiet )
+ log() << "pair: setting master=" << n << " was " << state << '\n';
+ state = n;
}
/* peer unreachable, try our arbiter */
void ReplPair::arbitrate() {
ReplInfo r("arbitrate");
- if( arbHost == "-" ) {
+ if ( arbHost == "-" ) {
// no arbiter. we are up, let's assume he is down and network is not partitioned.
setMasterLocked(State_Master, "remote unreachable");
return;
@@ -141,15 +149,15 @@ void ReplPair::arbitrate() {
auto_ptr<DBClientConnection> conn( newClientConnection() );
string errmsg;
- if( !conn->connect(arbHost.c_str(), errmsg) ) {
+ if ( !conn->connect(arbHost.c_str(), errmsg) ) {
setMasterLocked(State_CantArb, "can't connect to arb");
return;
}
bool is_master;
BSONObj res = conn->cmdIsMaster(is_master);
- /*findOne("admin.$cmd", ismasterobj);*/
- if( res.isEmpty() ) {
+ /*findOne("admin.$cmd", ismasterobj);*/
+ if ( res.isEmpty() ) {
setMasterLocked(State_CantArb, "can't arb 2");
return;
}
@@ -159,35 +167,41 @@ void ReplPair::arbitrate() {
/* --------------------------------------------- */
-class CmdReplacePeer : public Command {
+class CmdReplacePeer : public Command {
public:
- virtual bool slaveOk() { return true; }
- virtual bool adminOnly() { return true; }
- virtual bool logTheOp() { return false; }
+ virtual bool slaveOk() {
+ return true;
+ }
+ virtual bool adminOnly() {
+ return true;
+ }
+ virtual bool logTheOp() {
+ return false;
+ }
CmdReplacePeer() : Command("replacepeer") { }
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- if( replPair == 0 ) {
+ if ( replPair == 0 ) {
errmsg = "not paired";
return false;
- }
- if( !pairSync.initialSyncCompleted() ) {
- errmsg = "not caught up cannot replace peer";
+ }
+ if ( !pairSync.initialSyncCompleted() ) {
+ errmsg = "not caught up cannot replace peer";
return false;
}
- if( syncing < 0 ) {
+ if ( syncing < 0 ) {
errmsg = "replacepeer already invoked";
return false;
}
Timer t;
- while( 1 ) {
- if( syncing == 0 || t.millis() > 20000 )
+ while ( 1 ) {
+ if ( syncing == 0 || t.millis() > 20000 )
break;
{
dbtemprelease t;
sleepmillis(10);
}
}
- if( syncing ) {
+ if ( syncing ) {
assert( syncing > 0 );
errmsg = "timeout waiting for sync() to finish";
return false;
@@ -195,7 +209,7 @@ public:
{
vector<ReplSource*> sources;
ReplSource::loadAll(sources);
- if( sources.size() != 1 ) {
+ if ( sources.size() != 1 ) {
errmsg = "local.sources.count() != 1, cannot replace peer";
return false;
}
@@ -212,24 +226,26 @@ public:
}
} cmdReplacePeer;
-class CmdIsMaster : public Command {
+class CmdIsMaster : public Command {
public:
- virtual bool slaveOk() { return true; }
+ virtual bool slaveOk() {
+ return true;
+ }
CmdIsMaster() : Command("ismaster") { }
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) {
- if( allDead ) {
+ if ( allDead ) {
result.append("ismaster", 0.0);
- if( replPair )
+ if ( replPair )
result.append("remote", replPair->remote);
result.append("info", allDead);
}
- else if( replPair ) {
+ else if ( replPair ) {
result.append("ismaster", replPair->state);
result.append("remote", replPair->remote);
- if( replPair->info.empty() )
+ if ( replPair->info.empty() )
result.append("info", replPair->info);
}
- else {
+ else {
result.append("ismaster", 1);
result.append("msg", "not paired");
}
@@ -249,20 +265,24 @@ public:
1,!1 -> 1,0
-1,-1 -> dominant->1, nondom->0
0,0 -> dominant->1, nondom->0
- 1,1 -> dominant->1, nondom->0
-
+ 1,1 -> dominant->1, nondom->0
+
{ negotiatemaster:1, i_was:<state>, your_name:<hostname> }
returns:
- { ok:1, you_are:..., i_am:... }
+ { ok:1, you_are:..., i_am:... }
*/
-class CmdNegotiateMaster : public Command {
+class CmdNegotiateMaster : public Command {
public:
CmdNegotiateMaster() : Command("negotiatemaster") { }
- virtual bool slaveOk() { return true; }
- virtual bool adminOnly() { return true; }
+ virtual bool slaveOk() {
+ return true;
+ }
+ virtual bool adminOnly() {
+ return true;
+ }
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
- if( replPair == 0 ) {
+ if ( replPair == 0 ) {
problem() << "got negotiatemaster cmd but we are not in paired mode." << endl;
errmsg = "not paired";
return false;
@@ -270,29 +290,31 @@ public:
int was = cmdObj.getIntField("i_was");
string myname = cmdObj.getStringField("your_name");
- if( myname.empty() || was < -1 ) {
+ if ( myname.empty() || was < -1 ) {
errmsg = "your_name/i_was not specified";
return false;
}
- int N = ReplPair::State_Negotiating;
- int M = ReplPair::State_Master;
- int S = ReplPair::State_Slave;
-
- if( !replPair->dominant( myname ) ) {
- result.append( "you_are", N );
- result.append( "i_am", N );
- return true;
- }
-
- int me, you;
- if( !pairSync.initialSyncCompleted() || ( replPair->state != M && was == M ) ) {
- me=S;you=M;
- }
- else {
- me=M;you=S;
- }
- replPair->setMaster( me, "CmdNegotiateMaster::run()" );
+ int N = ReplPair::State_Negotiating;
+ int M = ReplPair::State_Master;
+ int S = ReplPair::State_Slave;
+
+ if ( !replPair->dominant( myname ) ) {
+ result.append( "you_are", N );
+ result.append( "i_am", N );
+ return true;
+ }
+
+ int me, you;
+ if ( !pairSync.initialSyncCompleted() || ( replPair->state != M && was == M ) ) {
+ me=S;
+ you=M;
+ }
+ else {
+ me=M;
+ you=S;
+ }
+ replPair->setMaster( me, "CmdNegotiateMaster::run()" );
result.append("you_are", you);
result.append("i_am", me);
@@ -301,139 +323,139 @@ public:
}
} cmdnegotiatemaster;
-void ReplPair::negotiate(DBClientConnection *conn) {
+void ReplPair::negotiate(DBClientConnection *conn) {
BSONObjBuilder b;
b.append("negotiatemaster",1);
b.append("i_was", state);
b.append("your_name", remoteHost);
BSONObj cmd = b.done();
BSONObj res = conn->findOne("admin.$cmd", cmd);
- if( res.getIntField("ok") != 1 ) {
+ if ( res.getIntField("ok") != 1 ) {
problem() << "negotiate fails: " << res.toString() << '\n';
setMasterLocked(State_Confused);
return;
}
int x = res.getIntField("you_are");
- // State_Negotiating means the remote node is not dominant and cannot
- // choose who is master.
- if( x != State_Slave && x != State_Master && x != State_Negotiating ) {
+ // State_Negotiating means the remote node is not dominant and cannot
+ // choose who is master.
+ if ( x != State_Slave && x != State_Master && x != State_Negotiating ) {
problem() << "negotiate: bad you_are value " << res.toString() << endl;
return;
}
- if( x != State_Negotiating ) {
- // Don't actually have to lock here, since we only get here if not the
- // dominant node.
- setMaster(x);
- }
+ if ( x != State_Negotiating ) {
+ // Don't actually have to lock here, since we only get here if not the
+ // dominant node.
+ setMaster(x);
+ }
}
OpTime last(0, 0);
-OpTime OpTime::now() {
- unsigned t = (unsigned) time(0);
- if( last.secs == t ) {
- last.i++;
- return last;
- }
- last = OpTime(t, 1);
- return last;
+OpTime OpTime::now() {
+ unsigned t = (unsigned) time(0);
+ if ( last.secs == t ) {
+ last.i++;
+ return last;
+ }
+ last = OpTime(t, 1);
+ return last;
}
-struct TestOpTime {
- TestOpTime() {
- OpTime t;
- for( int i = 0; i < 10; i++ ) {
- OpTime s = OpTime::now();
- assert( s != t );
- t = s;
- }
- OpTime q = t;
- assert( q == t );
- assert( !(q != t) );
- }
+struct TestOpTime {
+ TestOpTime() {
+ OpTime t;
+ for ( int i = 0; i < 10; i++ ) {
+ OpTime s = OpTime::now();
+ assert( s != t );
+ t = s;
+ }
+ OpTime q = t;
+ assert( q == t );
+ assert( !(q != t) );
+ }
} testoptime;
-int test2() {
- return 0;
+int test2() {
+ return 0;
}
/* --------------------------------------------------------------*/
ReplSource::ReplSource() {
replacing = false;
- nClonedThisPass = 0;
- paired = false;
+ nClonedThisPass = 0;
+ paired = false;
}
ReplSource::ReplSource(BSONObj o) : nClonedThisPass(0) {
replacing = false;
- paired = false;
- only = o.getStringField("only");
- hostName = o.getStringField("host");
- _sourceName = o.getStringField("source");
- uassert( "'host' field not set in sources collection object", !hostName.empty() );
+ paired = false;
+ only = o.getStringField("only");
+ hostName = o.getStringField("host");
+ _sourceName = o.getStringField("source");
+ uassert( "'host' field not set in sources collection object", !hostName.empty() );
uassert( "only source='main' allowed for now with replication", sourceName() == "main" );
- BSONElement e = o.getField("syncedTo");
- if( !e.eoo() ) {
- uassert( "bad sources 'syncedTo' field value", e.type() == Date );
- OpTime tmp( e.date() );
- syncedTo = tmp;
- //syncedTo.asDate() = e.date();
- }
-
- BSONObj dbsObj = o.getObjectField("dbs");
- if( !dbsObj.isEmpty() ) {
- BSONObjIterator i(dbsObj);
- while( 1 ) {
- BSONElement e = i.next();
- if( e.eoo() )
- break;
- dbs.insert( e.fieldName() );
- }
- }
+ BSONElement e = o.getField("syncedTo");
+ if ( !e.eoo() ) {
+ uassert( "bad sources 'syncedTo' field value", e.type() == Date );
+ OpTime tmp( e.date() );
+ syncedTo = tmp;
+ //syncedTo.asDate() = e.date();
+ }
+
+ BSONObj dbsObj = o.getObjectField("dbs");
+ if ( !dbsObj.isEmpty() ) {
+ BSONObjIterator i(dbsObj);
+ while ( 1 ) {
+ BSONElement e = i.next();
+ if ( e.eoo() )
+ break;
+ dbs.insert( e.fieldName() );
+ }
+ }
}
/* Turn our C++ Source object into a BSONObj */
BSONObj ReplSource::jsobj() {
- BSONObjBuilder b;
- b.append("host", hostName);
- b.append("source", sourceName());
- if( !only.empty() )
- b.append("only", only);
- if( !syncedTo.isNull() )
+ BSONObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName());
+ if ( !only.empty() )
+ b.append("only", only);
+ if ( !syncedTo.isNull() )
b.appendDate("syncedTo", syncedTo.asDate());
- BSONObjBuilder dbs_builder;
+ BSONObjBuilder dbs_builder;
int n = 0;
- for( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
+ for ( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
n++;
- dbs_builder.appendBool(i->c_str(), 1);
- }
- if( n )
+ dbs_builder.appendBool(i->c_str(), 1);
+ }
+ if ( n )
b.append("dbs", dbs_builder.done());
- return b.doneAndDecouple();
+ return b.doneAndDecouple();
}
-void ReplSource::save() {
- BSONObjBuilder b;
+void ReplSource::save() {
+ BSONObjBuilder b;
assert( !hostName.empty() );
- b.append("host", hostName);
+ b.append("host", hostName);
// todo: finish allowing multiple source configs.
// this line doesn't work right when source is null, if that is allowed as it is now:
//b.append("source", _sourceName);
- BSONObj pattern = b.done();
+ BSONObj pattern = b.done();
- BSONObj o = jsobj();
+ BSONObj o = jsobj();
- stringstream ss;
- setClient("local.sources");
- int u = _updateObjects("local.sources", o, pattern, true/*upsert for pair feature*/, ss);
- assert( u == 1 || u == 4 );
- database = 0;
+ stringstream ss;
+ setClient("local.sources");
+ int u = _updateObjects("local.sources", o, pattern, true/*upsert for pair feature*/, ss);
+ assert( u == 1 || u == 4 );
+ database = 0;
- if( replacing ) {
- /* if we were in "replace" mode, we now have synced up with the replacement,
+ if ( replacing ) {
+ /* if we were in "replace" mode, we now have synced up with the replacement,
so turn that off.
*/
replacing = false;
@@ -443,46 +465,46 @@ void ReplSource::save() {
}
}
-void ReplSource::cleanup(vector<ReplSource*>& v) {
- for( vector<ReplSource*>::iterator i = v.begin(); i != v.end(); i++ )
- delete *i;
+void ReplSource::cleanup(vector<ReplSource*>& v) {
+ for ( vector<ReplSource*>::iterator i = v.begin(); i != v.end(); i++ )
+ delete *i;
}
string dashDashSource;
-static void addSourceToList(vector<ReplSource*>&v, ReplSource& s, vector<ReplSource*>&old) {
- for( vector<ReplSource*>::iterator i = old.begin(); i != old.end(); ) {
- if( s == **i ) {
- v.push_back(*i);
- old.erase(i);
- return;
- }
- i++;
- }
-
- v.push_back( new ReplSource(s) );
+static void addSourceToList(vector<ReplSource*>&v, ReplSource& s, vector<ReplSource*>&old) {
+ for ( vector<ReplSource*>::iterator i = old.begin(); i != old.end(); ) {
+ if ( s == **i ) {
+ v.push_back(*i);
+ old.erase(i);
+ return;
+ }
+ i++;
+ }
+
+ v.push_back( new ReplSource(s) );
}
-/* we reuse our existing objects so that we can keep our existing connection
- and cursor in effect.
+/* we reuse our existing objects so that we can keep our existing connection
+ and cursor in effect.
*/
-void ReplSource::loadAll(vector<ReplSource*>& v) {
- vector<ReplSource *> old = v;
+void ReplSource::loadAll(vector<ReplSource*>& v) {
+ vector<ReplSource *> old = v;
v.erase(v.begin(), v.end());
- bool gotPairWith = false;
+ bool gotPairWith = false;
- if( !dashDashSource.empty() ) {
+ if ( !dashDashSource.empty() ) {
setClient("local.sources");
// --source <host> specified.
// check that no items are in sources other than that
// add if missing
auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
int n = 0;
- while( c->ok() ) {
+ while ( c->ok() ) {
n++;
ReplSource tmp(c->current());
- if( tmp.hostName != dashDashSource ) {
+ if ( tmp.hostName != dashDashSource ) {
problem() << "--source " << dashDashSource << " != " << tmp.hostName << " from local.sources collection" << endl;
log() << "terminating after 30 seconds" << endl;
sleepsecs(30);
@@ -491,7 +513,7 @@ void ReplSource::loadAll(vector<ReplSource*>& v) {
c->advance();
}
uassert( "local.sources collection corrupt?", n<2 );
- if( n == 0 ) {
+ if ( n == 0 ) {
// source missing. add.
ReplSource s;
s.hostName = dashDashSource;
@@ -499,290 +521,290 @@ void ReplSource::loadAll(vector<ReplSource*>& v) {
}
}
- setClient("local.sources");
- auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
- while( c->ok() ) {
- ReplSource tmp(c->current());
- if( replPair && tmp.hostName == replPair->remote && tmp.sourceName() == "main" ) {
- gotPairWith = true;
+ setClient("local.sources");
+ auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
+ while ( c->ok() ) {
+ ReplSource tmp(c->current());
+ if ( replPair && tmp.hostName == replPair->remote && tmp.sourceName() == "main" ) {
+ gotPairWith = true;
tmp.paired = true;
- if( replacePeer ) {
+ if ( replacePeer ) {
// peer was replaced -- start back at the beginning.
tmp.syncedTo = OpTime();
tmp.replacing = true;
}
}
- addSourceToList(v, tmp, old);
- c->advance();
- }
- database = 0;
-
- if( !gotPairWith && replPair ) {
- /* add the --pairwith server */
- ReplSource *s = new ReplSource();
- s->paired = true;
- s->hostName = replPair->remote;
+ addSourceToList(v, tmp, old);
+ c->advance();
+ }
+ database = 0;
+
+ if ( !gotPairWith && replPair ) {
+ /* add the --pairwith server */
+ ReplSource *s = new ReplSource();
+ s->paired = true;
+ s->hostName = replPair->remote;
s->replacing = replacePeer;
- v.push_back(s);
- }
+ v.push_back(s);
+ }
- for( vector<ReplSource*>::iterator i = old.begin(); i != old.end(); i++ )
+ for ( vector<ReplSource*>::iterator i = old.begin(); i != old.end(); i++ )
delete *i;
}
BSONObj opTimeQuery = fromjson("{getoptime:1}");
bool ReplSource::resync(string db) {
- {
- log() << "resync: dropping database " << db << endl;
- string dummyns = db + ".";
- assert( database->name == db );
- dropDatabase(dummyns.c_str());
- setClientTempNs(dummyns.c_str());
- }
-
- {
- log() << "resync: cloning database " << db << endl;
- ReplInfo r("resync: cloning a database");
- string errmsg;
- bool ok = cloneFrom(hostName.c_str(), errmsg, database->name, false, /*slaveok*/ true);
- if( !ok ) {
- problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
- throw SyncException();
- }
- }
-
- log() << "resync: done " << db << endl;
-
- /* add the db to our dbs array which we will write back to local.sources.
- note we are not in a consistent state until the oplog gets applied,
- which happens next when this returns.
- */
- dbs.insert(db);
- return true;
+ {
+ log() << "resync: dropping database " << db << endl;
+ string dummyns = db + ".";
+ assert( database->name == db );
+ dropDatabase(dummyns.c_str());
+ setClientTempNs(dummyns.c_str());
+ }
+
+ {
+ log() << "resync: cloning database " << db << endl;
+ ReplInfo r("resync: cloning a database");
+ string errmsg;
+ bool ok = cloneFrom(hostName.c_str(), errmsg, database->name, false, /*slaveok*/ true);
+ if ( !ok ) {
+ problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
+ throw SyncException();
+ }
+ }
+
+ log() << "resync: done " << db << endl;
+
+ /* add the db to our dbs array which we will write back to local.sources.
+ note we are not in a consistent state until the oplog gets applied,
+ which happens next when this returns.
+ */
+ dbs.insert(db);
+ return true;
}
/* local.$oplog.main is of the form:
- { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
+ { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
...
see logOp() comments.
*/
void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op) {
- char clientName[MaxClientLen];
- const char *ns = op.getStringField("ns");
- nsToClient(ns, clientName);
+ char clientName[MaxClientLen];
+ const char *ns = op.getStringField("ns");
+ nsToClient(ns, clientName);
- if( *ns == '.' ) {
+ if ( *ns == '.' ) {
problem() << "skipping bad op in oplog: " << op.toString() << endl;
return;
}
- else if( *ns == 0 ) {
+ else if ( *ns == 0 ) {
problem() << "halting replication, bad op in oplog:\n " << op.toString() << endl;
allDead = "bad object in oplog";
throw SyncException();
}
- if( !only.empty() && only != clientName )
- return;
+ if ( !only.empty() && only != clientName )
+ return;
- bool newDb = dbs.count(clientName) == 0;
- if( newDb && nClonedThisPass ) {
- /* we only clone one database per pass, even if a lot need done. This helps us
- avoid overflowing the master's transaction log by doing too much work before going
- back to read more transactions. (Imagine a scenario of slave startup where we try to
- clone 100 databases in one pass.)
- */
- addDbNextPass.insert(clientName);
- return;
- }
+ bool newDb = dbs.count(clientName) == 0;
+ if ( newDb && nClonedThisPass ) {
+ /* we only clone one database per pass, even if a lot need done. This helps us
+ avoid overflowing the master's transaction log by doing too much work before going
+ back to read more transactions. (Imagine a scenario of slave startup where we try to
+ clone 100 databases in one pass.)
+ */
+ addDbNextPass.insert(clientName);
+ return;
+ }
- dblock lk;
- bool justCreated = setClientTempNs(ns);
- if( allDead ) {
+ dblock lk;
+ bool justCreated = setClientTempNs(ns);
+ if ( allDead ) {
// hmmm why is this check here and not at top of this function? does it get set between top and here?
log() << "allDead, throwing SyncException\n";
- throw SyncException();
+ throw SyncException();
}
// operation type -- see logOp() comments for types
- const char *opType = op.getStringField("op");
-
- if( justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */
- newDb ) /* if not in dbs, we've never synced this database before, so we need everything */
- {
- if( op.getBoolField("first") &&
- pairSync.initialSyncCompleted() /*<- when false, we are a replacement volume for a pair and need a full sync */
- ) {
+ const char *opType = op.getStringField("op");
+
+ if ( justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */
+ newDb ) /* if not in dbs, we've never synced this database before, so we need everything */
+ {
+ if ( op.getBoolField("first") &&
+ pairSync.initialSyncCompleted() /*<- when false, we are a replacement volume for a pair and need a full sync */
+ ) {
log() << "pull: got {first:true} op ns:" << ns << '\n';
/* this is the first thing in the oplog ever, so we don't need to resync(). */
- if( newDb )
+ if ( newDb )
dbs.insert(clientName);
- else
+ else
problem() << "warning: justCreated && !newDb in repl " << op.toString() << endl;
}
- else if( paired && !justCreated ) {
- if( strcmp(opType,"db") == 0 && strcmp(ns, "admin.") == 0 ) {
- // "admin" is a special namespace we use for priviledged commands -- ok if it exists first on
+ else if ( paired && !justCreated ) {
+ if ( strcmp(opType,"db") == 0 && strcmp(ns, "admin.") == 0 ) {
+ // "admin" is a special namespace we use for priviledged commands -- ok if it exists first on
// either side
}
else {
- /* the other half of our pair has some operations. yet we already had a db on our
+ /* the other half of our pair has some operations. yet we already had a db on our
disk even though the db in question is not listed in the source.
*/
allDead = "pair: historical image missing for a db";
problem() << "pair: historical image missing for " << clientName << ", setting allDead=true" << endl;
log() << "op:" << op.toString() << endl;
/*
- log() << "TEMP: pair: assuming we have the historical image for: " <<
+ log() << "TEMP: pair: assuming we have the historical image for: " <<
clientName << ". add extra checks here." << endl;
dbs.insert(clientName);
*/
}
- }
- else {
- nClonedThisPass++;
- resync(database->name);
- }
+ }
+ else {
+ nClonedThisPass++;
+ resync(database->name);
+ }
addDbNextPass.erase(clientName);
- }
-
- stringstream ss;
- BSONObj o = op.getObjectField("o");
- try {
- if( *opType == 'i' ) {
- const char *p = strchr(ns, '.');
- if( p && strcmp(p, ".system.indexes") == 0 ) {
- // updates aren't allowed for indexes -- so we will do a regular insert. if index already
- // exists, that is ok.
- theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
- }
- else {
- // do upserts for inserts as we might get replayed more than once
- OID *oid = o.getOID();
- if( oid == 0 ) {
- _updateObjects(ns, o, o, true, ss);
- }
- else {
- BSONObjBuilder b;
- b.appendOID("_id", oid);
- RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
- _updateObjects(ns, o, b.done(), true, ss);
- }
- }
- }
- else if( *opType == 'u' ) {
- RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
- _updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss);
- }
- else if( *opType == 'd' ) {
- if( opType[1] == 0 )
- deleteObjects(ns, o, op.getBoolField("b"));
- else
- assert( opType[1] == 'b' ); // "db" advertisement
- }
- else {
- BufBuilder bb;
- BSONObjBuilder ob;
- assert( *opType == 'c' );
- _runCommands(ns, o, ss, bb, ob, true);
- }
- }
- catch( UserAssertionException& e ) {
- log() << "sync: caught user assertion " << e.msg << '\n';
- }
- database = 0;
+ }
+
+ stringstream ss;
+ BSONObj o = op.getObjectField("o");
+ try {
+ if ( *opType == 'i' ) {
+ const char *p = strchr(ns, '.');
+ if ( p && strcmp(p, ".system.indexes") == 0 ) {
+ // updates aren't allowed for indexes -- so we will do a regular insert. if index already
+ // exists, that is ok.
+ theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
+ }
+ else {
+ // do upserts for inserts as we might get replayed more than once
+ OID *oid = o.getOID();
+ if ( oid == 0 ) {
+ _updateObjects(ns, o, o, true, ss);
+ }
+ else {
+ BSONObjBuilder b;
+ b.appendOID("_id", oid);
+ RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
+ _updateObjects(ns, o, b.done(), true, ss);
+ }
+ }
+ }
+ else if ( *opType == 'u' ) {
+ RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
+ _updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss);
+ }
+ else if ( *opType == 'd' ) {
+ if ( opType[1] == 0 )
+ deleteObjects(ns, o, op.getBoolField("b"));
+ else
+ assert( opType[1] == 'b' ); // "db" advertisement
+ }
+ else {
+ BufBuilder bb;
+ BSONObjBuilder ob;
+ assert( *opType == 'c' );
+ _runCommands(ns, o, ss, bb, ob, true);
+ }
+ }
+ catch ( UserAssertionException& e ) {
+ log() << "sync: caught user assertion " << e.msg << '\n';
+ }
+ database = 0;
}
/* note: not yet in mutex at this point. */
-bool ReplSource::sync_pullOpLog() {
- string ns = string("local.oplog.$") + sourceName();
+bool ReplSource::sync_pullOpLog() {
+ string ns = string("local.oplog.$") + sourceName();
debugrepl( "sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() );
- bool tailing = true;
- DBClientCursor *c = cursor.get();
- if( c && c->isDead() ) {
- log() << "pull: old cursor isDead, initiating a new one\n";
- c = 0;
- }
-
- if( c == 0 ) {
- BSONObjBuilder q;
- q.appendDate("$gte", syncedTo.asDate());
- BSONObjBuilder query;
- query.append("ts", q.done());
+ bool tailing = true;
+ DBClientCursor *c = cursor.get();
+ if ( c && c->isDead() ) {
+ log() << "pull: old cursor isDead, initiating a new one\n";
+ c = 0;
+ }
+
+ if ( c == 0 ) {
+ BSONObjBuilder q;
+ q.appendDate("$gte", syncedTo.asDate());
+ BSONObjBuilder query;
+ query.append("ts", q.done());
BSONObj queryObj = query.done();
- // queryObj = { ts: { $gte: syncedTo } }
+ // queryObj = { ts: { $gte: syncedTo } }
debugrepl( ns << ".find(" << queryObj.toString() << ')' );
- cursor = conn->query( ns.c_str(), queryObj, 0, 0, 0, Option_CursorTailable | Option_SlaveOk );
- c = cursor.get();
- tailing = false;
- }
- else {
+ cursor = conn->query( ns.c_str(), queryObj, 0, 0, 0, Option_CursorTailable | Option_SlaveOk );
+ c = cursor.get();
+ tailing = false;
+ }
+ else {
debugrepl( "tailing=true" );
}
- if( c == 0 ) {
+ if ( c == 0 ) {
problem() << "pull: dbclient::query returns null (conn closed?)" << endl;
resetConnection();
sleepsecs(3);
return false;
}
- // show any deferred database creates from a previous pass
- {
- set<string>::iterator i = addDbNextPass.begin();
- if( i != addDbNextPass.end() ) {
- BSONObjBuilder b;
- b.append("ns", *i + '.');
- b.append("op", "db");
- BSONObj op = b.done();
- sync_pullOpLog_applyOperation(op);
- }
- }
-
- if( !c->more() ) {
- if( tailing ) {
+ // show any deferred database creates from a previous pass
+ {
+ set<string>::iterator i = addDbNextPass.begin();
+ if ( i != addDbNextPass.end() ) {
+ BSONObjBuilder b;
+ b.append("ns", *i + '.');
+ b.append("op", "db");
+ BSONObj op = b.done();
+ sync_pullOpLog_applyOperation(op);
+ }
+ }
+
+ if ( !c->more() ) {
+ if ( tailing ) {
debugrepl( "tailing & no new activity" );
} else
- log() << "pull: " << ns << " oplog is empty\n";
- sleepsecs(3);
- return true;
- }
+ log() << "pull: " << ns << " oplog is empty\n";
+ sleepsecs(3);
+ return true;
+ }
int n = 0;
- BSONObj op = c->next();
- BSONElement ts = op.findElement("ts");
- if( ts.type() != Date ) {
+ BSONObj op = c->next();
+ BSONElement ts = op.findElement("ts");
+ if ( ts.type() != Date ) {
string err = op.getStringField("$err");
- if( !err.empty() ) {
+ if ( !err.empty() ) {
problem() << "pull: $err reading remote oplog: " + err << '\n';
massert( "got $err reading remote oplog", false );
}
- else {
+ else {
problem() << "pull: bad object read from remote oplog: " << op.toString() << '\n';
massert("pull: bad object read from remote oplog", false);
}
}
- OpTime nextOpTime( ts.date() );
+ OpTime nextOpTime( ts.date() );
debugrepl( "first op time received: " << nextOpTime.toString() );
- bool initial = syncedTo.isNull();
- if( initial || tailing ) {
- if( tailing ) {
- assert( syncedTo < nextOpTime );
- }
- else {
- log() << "pull: initial run\n";
- }
+ bool initial = syncedTo.isNull();
+ if ( initial || tailing ) {
+ if ( tailing ) {
+ assert( syncedTo < nextOpTime );
+ }
+ else {
+ log() << "pull: initial run\n";
+ }
{
sync_pullOpLog_applyOperation(op);
n++;
}
- }
- else if( nextOpTime != syncedTo ) {
+ }
+ else if ( nextOpTime != syncedTo ) {
Logstream& l = log();
- l << "pull: nextOpTime " << nextOpTime.toStringLong() << ' ';
- if( nextOpTime < syncedTo )
+ l << "pull: nextOpTime " << nextOpTime.toStringLong() << ' ';
+ if ( nextOpTime < syncedTo )
l << "<??";
else
l << ">";
@@ -792,67 +814,67 @@ bool ReplSource::sync_pullOpLog() {
log() << "pull: tailing: " << tailing << '\n';
log() << "pull: data too stale, halting replication" << endl;
replInfo = allDead = "data too stale halted replication";
- assert( syncedTo < nextOpTime );
- throw SyncException();
- }
- else {
+ assert( syncedTo < nextOpTime );
+ throw SyncException();
+ }
+ else {
/* t == syncedTo, so the first op was applied previously, no need to redo it. */
}
- // apply operations
- {
- while( 1 ) {
- if( !c->more() ) {
- log() << "pull: applied " << n << " operations" << endl;
- syncedTo = nextOpTime;
+ // apply operations
+ {
+ while ( 1 ) {
+ if ( !c->more() ) {
+ log() << "pull: applied " << n << " operations" << endl;
+ syncedTo = nextOpTime;
debugrepl( "end sync_pullOpLog syncedTo: " << syncedTo.toStringLong() );
- dblock lk;
- save(); // note how far we are synced up to now
- break;
- }
- /* todo: get out of the mutex for the next()? */
- BSONObj op = c->next();
- ts = op.findElement("ts");
- assert( ts.type() == Date );
- OpTime last = nextOpTime;
- OpTime tmp( ts.date() );
- nextOpTime = tmp;
- if( !( last < nextOpTime ) ) {
- problem() << "sync error: last " << last.toString() << " >= nextOpTime " << nextOpTime.toString() << endl;
- uassert("bad 'ts' value in sources", false);
- }
-
- sync_pullOpLog_applyOperation(op);
- n++;
- }
- }
+ dblock lk;
+ save(); // note how far we are synced up to now
+ break;
+ }
+ /* todo: get out of the mutex for the next()? */
+ BSONObj op = c->next();
+ ts = op.findElement("ts");
+ assert( ts.type() == Date );
+ OpTime last = nextOpTime;
+ OpTime tmp( ts.date() );
+ nextOpTime = tmp;
+ if ( !( last < nextOpTime ) ) {
+ problem() << "sync error: last " << last.toString() << " >= nextOpTime " << nextOpTime.toString() << endl;
+ uassert("bad 'ts' value in sources", false);
+ }
+
+ sync_pullOpLog_applyOperation(op);
+ n++;
+ }
+ }
return true;
}
-/* note: not yet in mutex at this point.
+/* note: not yet in mutex at this point.
returns true if everything happy. return false if you want to reconnect.
*/
-bool ReplSource::sync() {
+bool ReplSource::sync() {
ReplInfo r("sync");
- if( !quiet )
+ if ( !quiet )
log() << "pull: " << sourceName() << '@' << hostName << endl;
- nClonedThisPass = 0;
+ nClonedThisPass = 0;
- if( (string("localhost") == hostName || string("127.0.0.1") == hostName) && port == DBPort ) {
+ if ( (string("localhost") == hostName || string("127.0.0.1") == hostName) && port == DBPort ) {
log() << "pull: can't sync from self (localhost). sources configuration may be wrong." << endl;
- sleepsecs(5);
+ sleepsecs(5);
return false;
}
- if( conn.get() == 0 ) {
- conn = auto_ptr<DBClientConnection>(new DBClientConnection());
- string errmsg;
+ if ( conn.get() == 0 ) {
+ conn = auto_ptr<DBClientConnection>(new DBClientConnection());
+ string errmsg;
ReplInfo r("trying to connect to sync source");
- if( !conn->connect(hostName.c_str(), errmsg) ) {
- resetConnection();
- log() << "pull: cantconn " << errmsg << endl;
- if( replPair && paired ) {
+ if ( !conn->connect(hostName.c_str(), errmsg) ) {
+ resetConnection();
+ log() << "pull: cantconn " << errmsg << endl;
+ if ( replPair && paired ) {
assert( startsWith(hostName.c_str(), replPair->remoteHost.c_str()) );
replPair->arbitrate();
}
@@ -860,27 +882,27 @@ bool ReplSource::sync() {
ReplInfo r("can't connect to sync source, sleeping");
sleepsecs(1);
}
- return false;
- }
- }
+ return false;
+ }
+ }
- if( paired )
+ if ( paired )
replPair->negotiate(conn.get());
-/*
- // get current mtime at the server.
- BSONObj o = conn->findOne("admin.$cmd", opTimeQuery);
- BSONElement e = o.findElement("optime");
- if( e.eoo() ) {
- log() << "pull: failed to get cur optime from master" << endl;
- log() << " " << o.toString() << endl;
- return false;
- }
- uassert( e.type() == Date );
- OpTime serverCurTime;
- serverCurTime.asDate() = e.date();
-*/
- return sync_pullOpLog();
+ /*
+ // get current mtime at the server.
+ BSONObj o = conn->findOne("admin.$cmd", opTimeQuery);
+ BSONElement e = o.findElement("optime");
+ if( e.eoo() ) {
+ log() << "pull: failed to get cur optime from master" << endl;
+ log() << " " << o.toString() << endl;
+ return false;
+ }
+ uassert( e.type() == Date );
+ OpTime serverCurTime;
+ serverCurTime.asDate() = e.date();
+ */
+ return sync_pullOpLog();
}
/* -- Logging of operations -------------------------------------*/
@@ -892,7 +914,7 @@ Database *localOplogClient = 0;
/* we write to local.opload.$main:
{ ts : ..., op: ..., ns: ..., o: ... }
ts: an OpTime timestamp
- op:
+ op:
"i" insert
"u" update
"d" delete
@@ -906,57 +928,57 @@ Database *localOplogClient = 0;
thus, the slave does not need to copy down all the data when it sees this.
*/
void _logOp(const char *opstr, const char *ns, BSONObj& obj, BSONObj *o2, bool *bb) {
- if( strncmp(ns, "local.", 6) == 0 )
- return;
+ if ( strncmp(ns, "local.", 6) == 0 )
+ return;
- Database *oldClient = database;
+ Database *oldClient = database;
bool haveLogged = database && database->haveLogged();
- /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
- instead we do a single copy to the destination position in the memory mapped file.
+ /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
+ instead we do a single copy to the destination position in the memory mapped file.
*/
- BSONObjBuilder b;
- b.appendDate("ts", OpTime::now().asDate());
- b.append("op", opstr);
- b.append("ns", ns);
- if( bb )
- b.appendBool("b", *bb);
- if( o2 )
- b.append("o2", *o2);
- if( !haveLogged ) {
+ BSONObjBuilder b;
+ b.appendDate("ts", OpTime::now().asDate());
+ b.append("op", opstr);
+ b.append("ns", ns);
+ if ( bb )
+ b.appendBool("b", *bb);
+ if ( o2 )
+ b.append("o2", *o2);
+ if ( !haveLogged ) {
b.appendBool("first", true);
- if( database ) // null on dropDatabase()'s logging.
+ if ( database ) // null on dropDatabase()'s logging.
database->setHaveLogged();
}
- BSONObj partial = b.done();
- int posz = partial.objsize();
- int len = posz + obj.objsize() + 1 + 2 /*o:*/;
-
- if( localOplogMainDetails == 0 ) {
- setClientTempNs("local.");
- localOplogClient = database;
- localOplogMainDetails = nsdetails("local.oplog.$main");
- }
- database = localOplogClient;
-
- Record *r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, "local.oplog.$main", len);
-
- char *p = r->data;
- memcpy(p, partial.objdata(), posz);
- *((unsigned *)p) += obj.objsize() + 1 + 2;
- p += posz - 1;
- *p++ = (char) Object;
- *p++ = 'o';
- *p++ = 0;
- memcpy(p, obj.objdata(), obj.objsize());
- p += obj.objsize();
- *p = EOO;
+ BSONObj partial = b.done();
+ int posz = partial.objsize();
+ int len = posz + obj.objsize() + 1 + 2 /*o:*/;
+
+ if ( localOplogMainDetails == 0 ) {
+ setClientTempNs("local.");
+ localOplogClient = database;
+ localOplogMainDetails = nsdetails("local.oplog.$main");
+ }
+ database = localOplogClient;
+
+ Record *r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, "local.oplog.$main", len);
+
+ char *p = r->data;
+ memcpy(p, partial.objdata(), posz);
+ *((unsigned *)p) += obj.objsize() + 1 + 2;
+ p += posz - 1;
+ *p++ = (char) Object;
+ *p++ = 'o';
+ *p++ = 0;
+ memcpy(p, obj.objdata(), obj.objsize());
+ p += obj.objsize();
+ *p = EOO;
//BSONObj temp(r);
//cout << "temp:" << temp.toString() << endl;
- database = oldClient;
+ database = oldClient;
}
/* --------------------------------------------------------------*/
@@ -968,14 +990,14 @@ _ reuse that cursor when we can
*/
/* returns: # of seconds to sleep before next pass */
-int _replMain(vector<ReplSource*>& sources) {
- {
+int _replMain(vector<ReplSource*>& sources) {
+ {
ReplInfo r("replMain load sources");
dblock lk;
ReplSource::loadAll(sources);
}
-
- if( sources.empty() ) {
+
+ if ( sources.empty() ) {
/* replication is not configured yet (for --slave) in local.sources. Poll for config it
every 20 seconds.
*/
@@ -983,62 +1005,62 @@ int _replMain(vector<ReplSource*>& sources) {
}
bool sleep = true;
- for( vector<ReplSource*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
- ReplSource *s = *i;
- bool ok = false;
+ for ( vector<ReplSource*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
+ ReplSource *s = *i;
+ bool ok = false;
try {
ok = s->sync();
bool moreToSync = s->haveMoreDbsToSync();
sleep = !moreToSync;
- if( ok && !moreToSync /*&& !s->syncedTo.isNull()*/ ) {
+ if ( ok && !moreToSync /*&& !s->syncedTo.isNull()*/ ) {
pairSync.setInitialSyncCompletedLocking();
}
}
- catch( SyncException& ) {
+ catch ( SyncException& ) {
log() << "caught SyncException, sleeping 10 secs" << endl;
return 10;
}
- catch( AssertionException& e ) {
- if( e.severe() ) {
+ catch ( AssertionException& e ) {
+ if ( e.severe() ) {
log() << "replMain caught AssertionException, sleeping 1 minutes" << endl;
return 60;
}
- else {
+ else {
log() << e.toString() << '\n';
}
replInfo = "replMain caught AssertionException";
}
- if( !ok )
+ if ( !ok )
s->resetConnection();
}
- if( sleep ) {
+ if ( sleep ) {
return 3;
}
return 0;
}
-void replMain() {
- vector<ReplSource*> sources;
- while( 1 ) {
+void replMain() {
+ vector<ReplSource*> sources;
+ while ( 1 ) {
int s = 0;
{
dblock lk;
- if( allDead )
+ if ( allDead )
break;
assert( syncing == 0 );
syncing++;
}
- try {
+ try {
s = _replMain(sources);
- } catch(...) {
+ } catch (...) {
cout << "TEMP: caught exception in _replMain" << endl;
- }
+ }
{
dblock lk;
assert( syncing == 1 );
syncing--;
}
- if( s ) {
+ if ( s ) {
stringstream ss;
ss << "replMain: sleep " << s << " before next pass";
string msg = ss.str();
@@ -1053,93 +1075,93 @@ void replMain() {
int debug_stop_repl = 0;
-void replSlaveThread() {
+void replSlaveThread() {
sleepsecs(1);
{
dblock lk;
BSONObj obj;
- if( getSingleton("local.pair.startup", obj) ) {
+ if ( getSingleton("local.pair.startup", obj) ) {
// should be: {replacepeer:1}
replacePeer = true;
pairSync.setInitialSyncCompleted(); // we are the half that has all the data
}
}
- while( 1 ) {
- try {
- replMain();
- if( debug_stop_repl )
- break;
- sleepsecs(5);
- }
- catch( AssertionException& ) {
- ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry");
- problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
- }
- }
+ while ( 1 ) {
+ try {
+ replMain();
+ if ( debug_stop_repl )
+ break;
+ sleepsecs(5);
+ }
+ catch ( AssertionException& ) {
+ ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry");
+ problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ }
+ }
}
/* used to verify that slave knows what databases we have */
-void logOurDbsPresence() {
- path dbs(dbpath);
+void logOurDbsPresence() {
+ path dbs(dbpath);
directory_iterator end;
directory_iterator i(dbs);
- dblock lk;
-
- while( i != end ) {
- path p = *i;
- string f = p.leaf();
- if( endsWith(f.c_str(), ".ns") ) {
- /* note: we keep trailing "." so that when slave calls setClient(ns) everything is happy; e.g.,
- valid namespaces must always have a dot, even though here it is just a placeholder not
- a real one
- */
- string dbname = string(f.c_str(), f.size() - 2);
- if( dbname != "local." ) {
- setClientTempNs(dbname.c_str());
- logOp("db", dbname.c_str(), emptyObj);
- }
- }
- i++;
+ dblock lk;
+
+ while ( i != end ) {
+ path p = *i;
+ string f = p.leaf();
+ if ( endsWith(f.c_str(), ".ns") ) {
+ /* note: we keep trailing "." so that when slave calls setClient(ns) everything is happy; e.g.,
+ valid namespaces must always have a dot, even though here it is just a placeholder not
+ a real one
+ */
+ string dbname = string(f.c_str(), f.size() - 2);
+ if ( dbname != "local." ) {
+ setClientTempNs(dbname.c_str());
+ logOp("db", dbname.c_str(), emptyObj);
+ }
+ }
+ i++;
}
database = 0;
}
/* we have to log the db presence periodically as that "advertisement" will roll out of the log
- as it is of finite length. also as we only do one db cloning per pass, we could skip over a bunch of
- advertisements and thus need to see them again later. so this mechanism can actually be very slow to
+ as it is of finite length. also as we only do one db cloning per pass, we could skip over a bunch of
+ advertisements and thus need to see them again later. so this mechanism can actually be very slow to
work, and should be improved.
*/
-void replMasterThread() {
+void replMasterThread() {
sleepsecs(15);
logOurDbsPresence();
- // if you are testing, you might finish test and shutdown in less than 10
- // minutes yet not have done something in first 15 -- this is to exercise
+ // if you are testing, you might finish test and shutdown in less than 10
+ // minutes yet not have done something in first 15 -- this is to exercise
// this code some.
- sleepsecs(90);
- logOurDbsPresence();
+ sleepsecs(90);
+ logOurDbsPresence();
- while( 1 ) {
- logOurDbsPresence();
- sleepsecs(60 * 10);
- }
+ while ( 1 ) {
+ logOurDbsPresence();
+ sleepsecs(60 * 10);
+ }
}
-void tempThread() {
- while( 1 ) {
+void tempThread() {
+ while ( 1 ) {
cout << dbMutexInfo.isLocked() << endl;
sleepmillis(100);
}
}
-void startReplication() {
- /* this was just to see if anything locks for longer than it should -- we need to be careful
+void startReplication() {
+ /* this was just to see if anything locks for longer than it should -- we need to be careful
not to be locked when trying to connect() or query() the other side.
*/
//boost::thread tempt(tempThread);
@@ -1149,38 +1171,38 @@ void startReplication() {
pairSync.init();
}
- if( slave || replPair ) {
- if( slave && !quiet )
- log() << "slave=true" << endl;
- slave = true;
- boost::thread repl_thread(replSlaveThread);
- }
-
- if( master || replPair ) {
- if( master && !quiet )
- log() << "master=true" << endl;
- master = true;
- {
- dblock lk;
- /* create an oplog collection, if it doesn't yet exist. */
- BSONObjBuilder b;
+ if ( slave || replPair ) {
+ if ( slave && !quiet )
+ log() << "slave=true" << endl;
+ slave = true;
+ boost::thread repl_thread(replSlaveThread);
+ }
+
+ if ( master || replPair ) {
+ if ( master && !quiet )
+ log() << "master=true" << endl;
+ master = true;
+ {
+ dblock lk;
+ /* create an oplog collection, if it doesn't yet exist. */
+ BSONObjBuilder b;
double sz = 50.0 * 1000 * 1000;
- if( sizeof(int *) >= 8 )
+ if ( sizeof(int *) >= 8 )
sz = 990.0 * 1000 * 1000;
- b.append("size", sz);
- b.appendBool("capped", 1);
- setClientTempNs("local.oplog.$main");
- string err;
- BSONObj o = b.done();
- userCreateNS("local.oplog.$main", o, err, false);
- database = 0;
- }
-
- boost::thread mt(replMasterThread);
- }
+ b.append("size", sz);
+ b.appendBool("capped", 1);
+ setClientTempNs("local.oplog.$main");
+ string err;
+ BSONObj o = b.done();
+ userCreateNS("local.oplog.$main", o, err, false);
+ database = 0;
+ }
+
+ boost::thread mt(replMasterThread);
+ }
}
/* called from main at server startup */
void pairWith(const char *remoteEnd, const char *arb) {
- replPair = new ReplPair(remoteEnd, arb);
+ replPair = new ReplPair(remoteEnd, arb);
}