summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordwight <dwight@Dwights-MacBook.local>2008-08-15 13:59:22 -0400
committerdwight <dwight@Dwights-MacBook.local>2008-08-15 13:59:22 -0400
commit6d4e9c19bf8920c1a7820ae5f9ae688c7dd98e70 (patch)
tree6f7e99719488c7d3c817a9eae077756a9b51ba4e
parent88c0900a4d606cb6c94f694b14e611cfff4a297c (diff)
downloadmongo-6d4e9c19bf8920c1a7820ae5f9ae688c7dd98e70.tar.gz
replication: keep db connection open across batches.
-rw-r--r--db/repl.cpp78
-rw-r--r--db/repl.h8
2 files changed, 64 insertions, 22 deletions
diff --git a/db/repl.cpp b/db/repl.cpp
index eb4fa7388f2..4ae9fae58a6 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -197,14 +197,37 @@ void Source::cleanup(vector<Source*>& v) {
delete *i;
}
+static void addSourceToList(vector<Source*>&v, Source& s, vector<Source*>&old) {
+ for( vector<Source*>::iterator i = old.begin(); i != old.end(); ) {
+ if( s == **i ) {
+ v.push_back(*i);
+ old.erase(i);
+ return;
+ }
+ i++;
+ }
+
+ v.push_back( new Source(s) );
+}
+
+/* we reuse our existing objects so that we can keep our existing connection
+ and cursor in effect.
+*/
void Source::loadAll(vector<Source*>& v) {
+ vector<Source *> old = v;
+ v.erase(v.begin(), v.end());
+
setClient("local.sources");
auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
while( c->ok() ) {
- v.push_back( new Source(c->current()) );
+ Source tmp(c->current());
+ addSourceToList(v, tmp, old);
c->advance();
}
client = 0;
+
+ for( vector<Source*>::iterator i = old.begin(); i != old.end(); i++ )
+ delete *i;
}
JSObj opTimeQuery = fromjson("{getoptime:1}");
@@ -370,6 +393,7 @@ bool Source::sync() {
}
}
+/*
// get current mtime at the server.
JSObj o = conn->findOne("admin.$cmd", opTimeQuery);
Element e = o.findElement("optime");
@@ -381,7 +405,7 @@ bool Source::sync() {
uassert( e.type() == Date );
OpTime serverCurTime;
serverCurTime.asDate() = e.date();
-
+*/
pullOpLog();
return true;
}
@@ -444,29 +468,43 @@ void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb)
/* --------------------------------------------------------------*/
+/*
+TODO:
+_ source has autoptr to the cursor
+_ reuse that cursor when we can
+*/
+
void replMain() {
vector<Source*> sources;
- {
- dblock lk;
- Source::loadAll(sources);
- }
-
- if( sources.empty() )
- sleepsecs(20);
-
- for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
- Source *s = *i;
- bool ok = false;
- try {
- ok = s->sync();
+ while( 1 ) {
+ {
+ dblock lk;
+ Source::loadAll(sources);
}
- catch( SyncException ) {
- log() << "caught SyncException, sleeping 5 minutes" << endl;
- sleepsecs(300);
+
+ if( sources.empty() )
+ sleepsecs(20);
+
+ for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
+ Source *s = *i;
+ bool ok = false;
+ try {
+ ok = s->sync();
+ }
+ catch( SyncException ) {
+ log() << "caught SyncException, sleeping 1 minutes" << endl;
+ sleepsecs(60);
+ }
+ catch( AssertionException ) {
+ log() << "replMain caught AssertionException, sleeping 1 minutes" << endl;
+ sleepsecs(60);
+ }
+ if( !ok )
+ s->resetConnection();
}
- if( !ok )
- s->resetConnection();
+
+ sleepsecs(3);
}
Source::cleanup(sources);
diff --git a/db/repl.h b/db/repl.h
index 8832dcc8d4a..f3c65402a46 100644
--- a/db/repl.h
+++ b/db/repl.h
@@ -33,8 +33,8 @@ extern bool slave;
extern bool master;
bool cloneFrom(const char *masterHost, string& errmsg);
-
-#pragma pack(push,4)
+
+#pragma pack(push,4)
class OpTime {
unsigned i;
unsigned secs;
@@ -109,6 +109,10 @@ public:
// make a jsobj from our member fields of the form
// { host: ..., source: ..., syncedTo: }
JSObj jsobj();
+
+ bool operator==(const Source&r) const {
+ return hostName == r.hostName && sourceName == r.sourceName;
+ }
};
/* Write operation to the log (local.oplog.$main)