summaryrefslogtreecommitdiff
path: root/db/repl.cpp
diff options
context:
space:
mode:
authorDwight <dmerriman@gmail.com>2008-07-29 17:54:54 -0400
committerDwight <dmerriman@gmail.com>2008-07-29 17:54:54 -0400
commit227f63072906b4e38740c83a8f39b74b550d5af3 (patch)
tree300742c5d2b906b98771a5810fdb6ea6fba5e67c /db/repl.cpp
parenta0d1c47ff79a0b2276a8d2088f3308746b03105c (diff)
downloadmongo-227f63072906b4e38740c83a8f39b74b550d5af3.tar.gz
replication
Diffstat (limited to 'db/repl.cpp')
-rw-r--r--db/repl.cpp202
1 files changed, 182 insertions, 20 deletions
diff --git a/db/repl.cpp b/db/repl.cpp
index 66e7e7187fd..d8ccd8ceb1c 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -30,6 +30,8 @@ extern JSObj emptyObj;
extern boost::mutex dbMutex;
auto_ptr<Cursor> findTableScan(const char *ns, JSObj& order);
bool userCreateNS(const char *ns, JSObj& j, string& err);
+int _updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss);
+bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuilder &b, JSObjBuilder& anObjBuilder);
OpTime last((unsigned) time(0), 1);
@@ -55,7 +57,7 @@ public:
};
void Cloner::copy(const char *collection) {
- cout << "TEMP:" << collection << endl;
+
auto_ptr<DBClientCursor> c( conn.query(collection, emptyObj) );
assert( c.get() );
@@ -122,17 +124,36 @@ Source::Source(JSObj o) {
uassert( e.type() == Date );
syncedTo.asDate() = e.date();
}
+
+ JSObj dbsObj = o.getObjectField("dbs");
+ if( !dbsObj.isEmpty() ) {
+ JSElemIter i(dbsObj);
+ while( 1 ) {
+ Element e = i.next();
+ if( e.eoo() )
+ break;
+ dbs.insert( e.fieldName() );
+ }
+ }
}
+/* Turn our C++ Source object into a JSObj */
JSObj Source::jsobj() {
JSObjBuilder b;
b.append("host", hostName);
b.append("source", sourceName);
b.appendDate("syncedTo", syncedTo.asDate());
+
+ JSObjBuilder dbs_builder;
+ for( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
+ dbs_builder.appendBool(i->c_str(), 1);
+ }
+ b.append("dbs", dbs_builder.done());
+
return b.doneAndDecouple();
}
-void Source::updateOnDisk() {
+void Source::save() {
JSObjBuilder b;
b.append("host", hostName);
b.append("source", sourceName);
@@ -142,7 +163,10 @@ void Source::updateOnDisk() {
stringstream ss;
setClient("local.sources");
- updateObjects("local.sources", o, pattern, false, ss);
+ //cout << o.toString() << endl;
+ //cout << pattern.toString() << endl;
+ int u = _updateObjects("local.sources", o, pattern, false, ss);
+ assert( u == 1 );
client = 0;
}
@@ -163,17 +187,142 @@ void Source::loadAll(vector<Source*>& v) {
JSObj opTimeQuery = fromjson("{getoptime:1}");
+bool Source::resync(string db) {
+ assert( client->name == db );
+
+ {
+ log() << "resync: dropping database " << db << endl;
+ string dummyns = db + ".";
+ dropDatabase(dummyns.c_str());
+ setClient(dummyns.c_str());
+ }
+
+ {
+ log() << "resync: cloning database " << db << endl;
+ Cloner c;
+ string errmsg;
+ bool ok = c.go(hostName.c_str(), errmsg);
+ if( !ok ) {
+ problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
+ throw SyncException();
+ }
+ }
+
+ log() << "resync: done " << db << endl;
+ dbs.insert(db);
+ return true;
+}
+
+/* { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
+ You must lock dbMutex before calling.
+*/
+void Source::applyOperation(JSObj& op) {
+ stringstream ss;
+ const char *ns = op.getStringField("ns");
+ setClient(ns);
+
+ if( client->justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */
+ !dbs.count(client->name) ) /* if not in dbs, we've never synced this database before, so we need everything */
+ {
+ resync(client->name);
+ client->justCreated = false;
+ }
+
+ const char *opType = op.getStringField("op");
+ JSObj o = op.getObjectField("o");
+ if( *opType == 'i' ) {
+ // do upserts for inserts as we might get replayed more than once
+ OID *oid = o.getOID();
+ if( oid == 0 ) {
+ _updateObjects(ns, o, o, true, ss);
+ }
+ else {
+ JSObjBuilder b;
+ b.appendOID("_id", oid);
+ _updateObjects(ns, o, b.done(), true, ss);
+ }
+ // theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
+ }
+ else if( *opType == 'u' ) {
+ _updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss);
+ }
+ else if( *opType == 'd' ) {
+ deleteObjects(ns, o, op.getBoolField("b"));
+ }
+ else {
+ BufBuilder bb;
+ JSObjBuilder ob;
+ assert( *opType == 'c' );
+ _runCommands(ns, o, ss, bb, ob);
+ }
+ client = 0;
+}
+
/* note: not yet in mutex at this point. */
-void Source::pull() {
- log() << "pull source " << sourceName << '@' << hostName << endl;
+void Source::pullOpLog(DBClientConnection& conn) {
+ JSObjBuilder q;
+ q.appendDate("$gte", syncedTo.asDate());
+ JSObjBuilder query;
+ query.append("ts", q.done());
+ // query = { ts: { $gte: syncedTo } }
+
+ string ns = string("local.oplog.$") + sourceName;
+ auto_ptr<DBClientCursor> c =
+ conn.query(ns.c_str(), query.done());
+ if( !c->more() ) {
+ problem() << "pull: " << ns << " empty?\n";
+ sleepsecs(3);
+ return;
+ }
+
+ JSObj j = c->next();
+ Element ts = j.findElement("ts");
+ assert( ts.type() == Date );
+ OpTime t;
+ t.asDate() = ts.date();
+ bool initial = syncedTo.isNull();
+ if( initial ) {
+ log() << "pull: initial run\n";
+ }
+ else if( t != syncedTo ) {
+ log() << "pull: t " << t.toString() << " != syncedTo " << syncedTo.toString() << '\n';
+ log() << " data too stale, halting replication" << endl;
+ assert( syncedTo < t );
+ throw SyncException();
+ }
+
+ // apply operations
+ int n = 0;
+ {
+ lock lk(dbMutex);
+ while( 1 ) {
+ if( !c->more() ) {
+ log() << "pull: applied " << n << " operations" << endl;
+ syncedTo = t;
+ save(); // note how far we are synced up to now
+ break;
+ }
+ /* todo: get out of the mutex for the next()? */
+ JSObj op = c->next();
+ ts = op.findElement("ts");
+ assert( ts.type() == Date );
+ OpTime last = t;
+ t.asDate() = ts.date();
+ assert( last < t );
+ applyOperation(op);
+ n++;
+ }
+ }
+}
-// if( syncedTo.isNull() ) {
-// }
+/* note: not yet in mutex at this point. */
+void Source::sync() {
+ log() << "pull: from " << sourceName << '@' << hostName << endl;
DBClientConnection conn;
string errmsg;
if( !conn.connect(hostName.c_str(), errmsg) ) {
- cout << " pull: cantconn " << errmsg << endl;
+ log() << " pull: cantconn " << errmsg << endl;
return;
}
@@ -181,14 +330,15 @@ void Source::pull() {
JSObj o = conn.findOne("admin.$cmd", opTimeQuery);
Element e = o.findElement("optime");
if( e.eoo() ) {
- cout << " pull: failed to get curtime from master" << endl;
- cout << " " << o.toString() << endl;
+ log() << " pull: failed to get cur optime from master" << endl;
+ log() << " " << o.toString() << endl;
return;
}
uassert( e.type() == Date );
OpTime serverCurTime;
serverCurTime.asDate() = e.date();
+ pullOpLog(conn);
}
/* -- Logging of operations -------------------------------------*/
@@ -200,7 +350,7 @@ Client *localOplogClient = 0;
/* we write to local.opload.$main:
{ ts : ..., op: ..., ns: ..., o: ... }
ts: an OpTime timestamp
- opstr:
+ op:
'i' = insert
*/
void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb) {
@@ -224,7 +374,7 @@ void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb)
b.append("op", opstr);
b.append("ns", ns);
if( bb )
- b.appendBool("upsert", *bb);
+ b.appendBool("b", *bb);
if( o2 )
b.append("o2", *o2);
JSObj partial = b.done();
@@ -257,17 +407,28 @@ void replMain() {
Source::loadAll(sources);
}
- for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
- (*i)->pull();
+ if( sources.empty() )
+ sleepsecs(20);
+
+ try {
+ for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ )
+ (*i)->sync();
+ }
+ catch( SyncException ) {
+ sleepsecs(300);
}
Source::cleanup(sources);
}
+int debug_stop_repl = 0;
+
void replMainThread() {
while( 1 ) {
try {
replMain();
+ if( debug_stop_repl )
+ break;
sleepsecs(5);
}
catch( AssertionException ) {
@@ -278,16 +439,17 @@ void replMainThread() {
}
void startReplication() {
+#if defined(_WIN32)
+ slave=true;
+#endif
if( slave ) {
+ log() << "slave=true" << endl;
boost::thread repl_thread(replMainThread);
}
-#if defined(_WIN32)
-// temp, remove this.
-master = true;
-#endif
-
- if( master ) {
+ if( master ) {
+ log() << "master=true" << endl;
+ /* create an oplog collection, if it doesn't yet exist. */
JSObjBuilder b;
b.append("size", 254.0 * 1000 * 1000);
b.appendBool("capped", 1);