diff options
author | dwight <dwight@Dwights-MacBook.local> | 2008-08-15 13:59:22 -0400 |
---|---|---|
committer | dwight <dwight@Dwights-MacBook.local> | 2008-08-15 13:59:22 -0400 |
commit | 6d4e9c19bf8920c1a7820ae5f9ae688c7dd98e70 (patch) | |
tree | 6f7e99719488c7d3c817a9eae077756a9b51ba4e | |
parent | 88c0900a4d606cb6c94f694b14e611cfff4a297c (diff) | |
download | mongo-6d4e9c19bf8920c1a7820ae5f9ae688c7dd98e70.tar.gz |
replication: keep db connection open across batches.
-rw-r--r-- | db/repl.cpp | 78 | ||||
-rw-r--r-- | db/repl.h | 8 |
2 files changed, 64 insertions, 22 deletions
diff --git a/db/repl.cpp b/db/repl.cpp index eb4fa7388f2..4ae9fae58a6 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -197,14 +197,37 @@ void Source::cleanup(vector<Source*>& v) { delete *i; } +static void addSourceToList(vector<Source*>&v, Source& s, vector<Source*>&old) { + for( vector<Source*>::iterator i = old.begin(); i != old.end(); ) { + if( s == **i ) { + v.push_back(*i); + old.erase(i); + return; + } + i++; + } + + v.push_back( new Source(s) ); +} + +/* we reuse our existing objects so that we can keep our existing connection + and cursor in effect. +*/ void Source::loadAll(vector<Source*>& v) { + vector<Source *> old = v; + v.erase(v.begin(), v.end()); + setClient("local.sources"); auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj); while( c->ok() ) { - v.push_back( new Source(c->current()) ); + Source tmp(c->current()); + addSourceToList(v, tmp, old); c->advance(); } client = 0; + + for( vector<Source*>::iterator i = old.begin(); i != old.end(); i++ ) + delete *i; } JSObj opTimeQuery = fromjson("{getoptime:1}"); @@ -370,6 +393,7 @@ bool Source::sync() { } } +/* // get current mtime at the server. JSObj o = conn->findOne("admin.$cmd", opTimeQuery); Element e = o.findElement("optime"); @@ -381,7 +405,7 @@ bool Source::sync() { uassert( e.type() == Date ); OpTime serverCurTime; serverCurTime.asDate() = e.date(); - +*/ pullOpLog(); return true; } @@ -444,29 +468,43 @@ void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb) /* --------------------------------------------------------------*/ +/* +TODO: +_ source has autoptr to the cursor +_ reuse that cursor when we can +*/ + void replMain() { vector<Source*> sources; - { - dblock lk; - Source::loadAll(sources); - } - - if( sources.empty() ) - sleepsecs(20); - - for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ ) { - Source *s = *i; - bool ok = false; - try { - ok = s->sync(); + while( 1 ) { + { + dblock lk; + Source::loadAll(sources); } - catch( SyncException ) { - log() << "caught SyncException, sleeping 5 minutes" << endl; - sleepsecs(300); + + if( sources.empty() ) + sleepsecs(20); + + for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ ) { + Source *s = *i; + bool ok = false; + try { + ok = s->sync(); + } + catch( SyncException ) { + log() << "caught SyncException, sleeping 1 minutes" << endl; + sleepsecs(60); + } + catch( AssertionException ) { + log() << "replMain caught AssertionException, sleeping 1 minutes" << endl; + sleepsecs(60); + } + if( !ok ) + s->resetConnection(); } - if( !ok ) - s->resetConnection(); + + sleepsecs(3); } Source::cleanup(sources); diff --git a/db/repl.h b/db/repl.h index 8832dcc8d4a..f3c65402a46 100644 --- a/db/repl.h +++ b/db/repl.h @@ -33,8 +33,8 @@ extern bool slave; extern bool master; bool cloneFrom(const char *masterHost, string& errmsg); -
-#pragma pack(push,4)
+ +#pragma pack(push,4) class OpTime { unsigned i; unsigned secs; @@ -109,6 +109,10 @@ public: // make a jsobj from our member fields of the form // { host: ..., source: ..., syncedTo: } JSObj jsobj(); + + bool operator==(const Source&r) const { + return hostName == r.hostName && sourceName == r.sourceName; + } }; /* Write operation to the log (local.oplog.$main) |