summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordwight <dwight@Dwights-MacBook.local>2008-08-02 14:58:15 -0400
committerdwight <dwight@Dwights-MacBook.local>2008-08-02 14:58:15 -0400
commitbc24777c118e6d84f14c71a9f5e310623e7551fa (patch)
tree842288ac92d6b0a39da7be535a3c14f9c999c2b0
parent9d5178583fdfedfb0b20c552688393493190fd72 (diff)
downloadmongo-bc24777c118e6d84f14c71a9f5e310623e7551fa.tar.gz
replication fixes plus some log cleanup
-rw-r--r--db/db.cpp5
-rw-r--r--db/db.h16
-rw-r--r--db/db.vcproj4
-rw-r--r--db/javajs.cpp29
-rw-r--r--db/pdfile.h22
-rw-r--r--db/repl.cpp727
-rw-r--r--grid/message.cpp6
-rw-r--r--util/goodies.h12
-rw-r--r--util/sock.h16
9 files changed, 432 insertions, 405 deletions
diff --git a/db/db.cpp b/db/db.cpp
index c203654a810..b3bc27a6c29 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -40,6 +40,7 @@ extern int curOp;
bool useCursors = true;
boost::mutex dbMutex;
+int dbLocked = 0;
void closeAllSockets();
void startReplication();
@@ -531,7 +532,7 @@ void connThread()
DbResponse dbresponse;
{
- lock lk(dbMutex);
+ dblock lk;
Timer t;
client = 0;
curOp = 0;
@@ -741,7 +742,7 @@ void mysighandler(int x) {
cout << "got kill or ctrl c signal " << x << ", will terminate after current cmd ends" << endl;
problem() << "got kill or ctrl c signal " << x << ", will terminate after current cmd ends" << endl;
{
- lock lk(dbMutex);
+ dblock lk;
problem() << " now exiting" << endl;
exit(12);
}
diff --git a/db/db.h b/db/db.h
index 6b179851953..023b789fde0 100644
--- a/db/db.h
+++ b/db/db.h
@@ -18,3 +18,19 @@
#include "../grid/message.h"
void jniCallback(Message& m, Message& out);
+
+extern boost::mutex dbMutex;
+extern int dbLocked;
+
+struct dblock {
+ boostlock bl;
+ dblock() : bl(dbMutex) {
+ dbLocked++;
+ assert( dbLocked == 1 );
+ }
+ ~dblock() {
+ dbLocked--;
+ assert( dbLocked == 0 );
+ }
+};
+
diff --git a/db/db.vcproj b/db/db.vcproj
index 9bcd685d7e1..710b14fa51e 100644
--- a/db/db.vcproj
+++ b/db/db.vcproj
@@ -43,7 +43,7 @@
<Tool
Name="VCCLCompilerTool"
Optimization="0"
- AdditionalIncludeDirectories="&quot;..\pcre-7.4&quot;;..\boost;&quot;C:\Program Files\Java\jdk1.6.0_05\include&quot;;&quot;C:\Program Files\Java\jdk1.6.0_05\include\win32&quot;"
+ AdditionalIncludeDirectories="&quot;..\pcre-7.4&quot;;..\boost;&quot;C:\Program Files\Java\jdk\include&quot;;&quot;C:\Program Files\Java\jdk\include\win32&quot;"
PreprocessorDefinitions="WIN32;_DEBUG;_CONSOLE;_SCL_SECURE_NO_DEPRECATE;BOOST_ALL_NO_LIB;BOOST_LIB_DIAGNOSTIC;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H"
MinimalRebuild="true"
BasicRuntimeChecks="3"
@@ -431,7 +431,7 @@
>
</File>
<File
- RelativePath="..\..\..\Program Files\Java\jdk1.6.0_05\lib\jvm.lib"
+ RelativePath="C:\Program Files\Java\jdk\lib\jvm.lib"
>
</File>
<File
diff --git a/db/javajs.cpp b/db/javajs.cpp
index 34156707d9f..a5281c4d3ec 100644
--- a/db/javajs.cpp
+++ b/db/javajs.cpp
@@ -145,9 +145,9 @@ JavaJSImpl::JavaJSImpl(const char *appserverPath){
jint res = JNI_CreateJavaVM( &_jvm, (void**)&_mainEnv, _vmArgs );
if( res ) {
- cout << "using classpath: " << q << endl;
- cerr
- << "res : " << res << " "
+ log() << "using classpath: " << q << endl;
+ log()
+ << " res : " << (unsigned) res << " "
<< "_jvm : " << _jvm << " "
<< "_env : " << _mainEnv << " "
<< endl;
@@ -341,7 +341,7 @@ void JavaJSImpl::run( const char * js ){
jassert( m );
jstring s = _getEnv()->NewStringUTF( js );
- cout << _getEnv()->CallStaticObjectMethod( c , m , s ) << endl;
+ log() << _getEnv()->CallStaticObjectMethod( c , m , s ) << endl;
}
void JavaJSImpl::printException(){
@@ -365,7 +365,7 @@ JNIEnv * JavaJSImpl::_getEnv(){
}
void jasserted(const char *msg, const char *file, unsigned line) {
- cout << "jassert failed " << msg << " " << file << " " << line << endl;
+ log() << "jassert failed " << msg << " " << file << " " << line << endl;
if ( JavaJS ) JavaJS->printException();
throw AssertionException();
}
@@ -382,22 +382,22 @@ const char* findEd(const char *path) {
return findEd();
}
- cout << "Appserver location specified : " << path << endl;
+ log() << "Appserver location specified : " << path << endl;
if (!path) {
- cout << " invalid appserver location : " << path << " : terminating - prepare for bus error" << endl;
+ log() << " invalid appserver location : " << path << " : terminating - prepare for bus error" << endl;
return 0;
}
DIR *testDir = opendir(path);
if (testDir) {
- cout << " found directory for appserver : " << path << endl;
+ log() << " found directory for appserver : " << path << endl;
closedir(testDir);
return path;
}
else {
- cout << " ERROR : not a directory for specified appserver location : " << path << " - prepare for bus error" << endl;
+ log() << " ERROR : not a directory for specified appserver location : " << path << " - prepare for bus error" << endl;
return null;
}
#endif
@@ -405,7 +405,7 @@ const char* findEd(const char *path) {
const char * findEd(){
- cout << "Appserver location not specified. Searching.... " << endl;
+ log() << "Appserver location not specified. Searching.... " << endl;
#if defined(_WIN32)
log() << " WIN32 default : c:/l/ed/" << endl;
@@ -426,11 +426,12 @@ const char * findEd(){
continue;
closedir( test );
- cout << " found directory for appserver : " << temp << endl;
+ log() << " found directory for appserver : " << temp << endl;
return temp;
}
- cout << " ERROR : can't find directory for appserver - prepare for bus error" << endl;
+ problem() << "ERROR : can't find directory for appserver - terminating" << endl;
+ exit(44);
return 0;
#endif
};
@@ -472,7 +473,7 @@ int javajstest() {
JavaJSImpl& JavaJS = *::JavaJS;
- if ( debug ) cout << "about to create scope" << endl;
+ if ( debug ) log() << "about to create scope" << endl;
jlong scope = JavaJS.scopeCreate();
jassert( scope );
if ( debug ) cout << "got scope" << endl;
@@ -502,7 +503,7 @@ int javajstest() {
if ( debug ) cout << "going to get object" << endl;
JSObj obj = JavaJS.scopeGetObject( scope , "abc" );
- if ( debug ) cout << "done gettting object" << endl;
+ if ( debug ) cout << "done getting object" << endl;
if ( debug ){
cout << "obj : " << obj.toString() << endl;
diff --git a/db/pdfile.h b/db/pdfile.h
index 7f731a5b962..75b249849e6 100644
--- a/db/pdfile.h
+++ b/db/pdfile.h
@@ -103,13 +103,13 @@ public:
/* Record is a record in a datafile. DeletedRecord is similar but for deleted space.
-*11:03:20 AM) dm10gen: regarding extentOfs...
-(11:03:42 AM) dm10gen: an extent is a continugous disk area, which contains many Records and DeleteRecords
-(11:03:56 AM) dm10gen: a DiskLoc has two pieces, the fileno and ofs. (64 bit total)
-(11:04:16 AM) dm10gen: to keep the headesr small, instead of storing a 64 bit ptr to the full extent address, we keep just the offset
-(11:04:29 AM) dm10gen: we can do this as we know the record's address, and it has the same fileNo
-(11:04:33 AM) dm10gen: see class DiskLoc for more info
-(11:04:43 AM) dm10gen: so that is how Record::myExtent() works
+*11:03:20 AM) dm10gen: regarding extentOfs...
+(11:03:42 AM) dm10gen: an extent is a continugous disk area, which contains many Records and DeleteRecords
+(11:03:56 AM) dm10gen: a DiskLoc has two pieces, the fileno and ofs. (64 bit total)
+(11:04:16 AM) dm10gen: to keep the headesr small, instead of storing a 64 bit ptr to the full extent address, we keep just the offset
+(11:04:29 AM) dm10gen: we can do this as we know the record's address, and it has the same fileNo
+(11:04:33 AM) dm10gen: see class DiskLoc for more info
+(11:04:43 AM) dm10gen: so that is how Record::myExtent() works
(11:04:53 AM) dm10gen: on an alloc(), when we build a new Record, we must popular its extentOfs then
*/
class Record {
@@ -137,7 +137,7 @@ public:
/* extents are datafile regions where all the records within the region
belong to the same namespace.
-(11:12:35 AM) dm10gen: when the extent is allocated, all its empty space is stuck into one big DeletedRecord
+(11:12:35 AM) dm10gen: when the extent is allocated, all its empty space is stuck into one big DeletedRecord
(11:12:55 AM) dm10gen: and that is placed on the free list
*/
class Extent {
@@ -446,7 +446,13 @@ public:
extern map<string,Client*> clients;
extern Client *client;
extern const char *curNs;
+extern int dbLocked;
inline void setClient(const char *ns) {
+ /* we must be in critical section at this point as these are global
+ variables.
+ */
+ assert( dbLocked == 1 );
+
char cl[256];
curNs = ns;
nsToClient(ns, cl);
diff --git a/db/repl.cpp b/db/repl.cpp
index 6ee1b8a7027..be3d39f76be 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -25,13 +25,14 @@
#include "pdfile.h"
#include "query.h"
#include "json.h"
+#include "db.h"
extern JSObj emptyObj;
extern boost::mutex dbMutex;
auto_ptr<Cursor> findTableScan(const char *ns, JSObj& order);
bool userCreateNS(const char *ns, JSObj& j, string& err);
-int _updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss);
-bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuilder &b, JSObjBuilder& anObjBuilder);
+int _updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss);
+bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuilder &b, JSObjBuilder& anObjBuilder);
OpTime last(0, 0);
@@ -88,9 +89,10 @@ void Cloner::copy(const char *collection) {
}
}
+extern int port;
bool Cloner::go(const char *masterHost, string& errmsg) {
- if( string("localhost") == masterHost || string("127.0.0.1") == masterHost ) {
- errmsg = "can't clone from self";
+ if( (string("localhost") == masterHost || string("127.0.0.1") == masterHost) && port == DBPort ) {
+ errmsg = "can't clone from self (localhost). sources configuration may be wrong.";
return false;
}
if( !conn.connect(masterHost, errmsg) )
@@ -127,361 +129,372 @@ bool Cloner::go(const char *masterHost, string& errmsg) {
return true;
}
-bool cloneFrom(const char *masterHost, string& errmsg)
-{
- Cloner c;
- return c.go(masterHost, errmsg);
-}
-
-/* --------------------------------------------------------------*/
-
-Source::Source(JSObj o) {
- hostName = o.getStringField("host");
- sourceName = o.getStringField("source");
- uassert( !hostName.empty() );
- uassert( !sourceName.empty() );
- Element e = o.getField("syncedTo");
- if( !e.eoo() ) {
- uassert( e.type() == Date );
- syncedTo.asDate() = e.date();
- }
-
- JSObj dbsObj = o.getObjectField("dbs");
- if( !dbsObj.isEmpty() ) {
- JSElemIter i(dbsObj);
- while( 1 ) {
- Element e = i.next();
- if( e.eoo() )
- break;
- dbs.insert( e.fieldName() );
- }
- }
-}
-
-/* Turn our C++ Source object into a JSObj */
-JSObj Source::jsobj() {
- JSObjBuilder b;
- b.append("host", hostName);
- b.append("source", sourceName);
- b.appendDate("syncedTo", syncedTo.asDate());
-
- JSObjBuilder dbs_builder;
- for( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
- dbs_builder.appendBool(i->c_str(), 1);
- }
- b.append("dbs", dbs_builder.done());
-
- return b.doneAndDecouple();
-}
-
-void Source::save() {
- JSObjBuilder b;
- b.append("host", hostName);
- b.append("source", sourceName);
- JSObj pattern = b.done();
-
- JSObj o = jsobj();
-
- stringstream ss;
- setClient("local.sources");
- //cout << o.toString() << endl;
- //cout << pattern.toString() << endl;
- int u = _updateObjects("local.sources", o, pattern, false, ss);
- assert( u == 1 );
- client = 0;
-}
-
-void Source::cleanup(vector<Source*>& v) {
- for( vector<Source*>::iterator i = v.begin(); i != v.end(); i++ )
- delete *i;
-}
-
-void Source::loadAll(vector<Source*>& v) {
- setClient("local.sources");
- auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
- while( c->ok() ) {
- v.push_back( new Source(c->current()) );
- c->advance();
- }
- client = 0;
-}
-
-JSObj opTimeQuery = fromjson("{getoptime:1}");
-
-bool Source::resync(string db) {
- {
- log() << "resync: dropping database " << db << endl;
- string dummyns = db + ".";
- assert( client->name == db );
- dropDatabase(dummyns.c_str());
- setClientTempNs(dummyns.c_str());
- }
-
- {
- log() << "resync: cloning database " << db << endl;
- Cloner c;
- string errmsg;
- bool ok = c.go(hostName.c_str(), errmsg);
- if( !ok ) {
- problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
- throw SyncException();
- }
- }
-
- log() << "resync: done " << db << endl;
- dbs.insert(db);
- return true;
-}
-
-/* { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
- You must lock dbMutex before calling.
-*/
-void Source::applyOperation(JSObj& op) {
- stringstream ss;
- const char *ns = op.getStringField("ns");
- setClientTempNs(ns);
-
- if( client->justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */
- !dbs.count(client->name) ) /* if not in dbs, we've never synced this database before, so we need everything */
- {
- resync(client->name);
- client->justCreated = false;
- }
-
- const char *opType = op.getStringField("op");
- JSObj o = op.getObjectField("o");
- if( *opType == 'i' ) {
- // do upserts for inserts as we might get replayed more than once
- OID *oid = o.getOID();
- if( oid == 0 ) {
- _updateObjects(ns, o, o, true, ss);
- }
- else {
- JSObjBuilder b;
- b.appendOID("_id", oid);
- _updateObjects(ns, o, b.done(), true, ss);
- }
- // theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
- }
- else if( *opType == 'u' ) {
- _updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss);
- }
- else if( *opType == 'd' ) {
- deleteObjects(ns, o, op.getBoolField("b"));
- }
- else {
- BufBuilder bb;
- JSObjBuilder ob;
- assert( *opType == 'c' );
- _runCommands(ns, o, ss, bb, ob);
- }
- client = 0;
-}
-
-/* note: not yet in mutex at this point. */
-void Source::pullOpLog(DBClientConnection& conn) {
- JSObjBuilder q;
- q.appendDate("$gte", syncedTo.asDate());
- JSObjBuilder query;
- query.append("ts", q.done());
- // query = { ts: { $gte: syncedTo } }
-
- string ns = string("local.oplog.$") + sourceName;
- auto_ptr<DBClientCursor> c =
- conn.query(ns.c_str(), query.done());
- if( !c->more() ) {
- problem() << "pull: " << ns << " empty?\n";
- sleepsecs(3);
- return;
- }
-
- JSObj j = c->next();
- Element ts = j.findElement("ts");
- assert( ts.type() == Date );
- OpTime t;
- t.asDate() = ts.date();
- bool initial = syncedTo.isNull();
- if( initial ) {
- log() << "pull: initial run\n";
- }
- else if( t != syncedTo ) {
- log() << "pull: t " << t.toString() << " != syncedTo " << syncedTo.toString() << '\n';
- log() << " data too stale, halting replication" << endl;
- assert( syncedTo < t );
- throw SyncException();
- }
-
- // apply operations
- int n = 0;
- {
- lock lk(dbMutex);
- while( 1 ) {
- if( !c->more() ) {
- log() << "pull: applied " << n << " operations" << endl;
- syncedTo = t;
- save(); // note how far we are synced up to now
- break;
- }
- /* todo: get out of the mutex for the next()? */
- JSObj op = c->next();
- ts = op.findElement("ts");
- assert( ts.type() == Date );
- OpTime last = t;
- t.asDate() = ts.date();
- if( !( last < t ) ) {
- problem() << "sync error: last " << last.toString() << " >= t " << t.toString() << endl;
- uassert(false);
- }
-
- applyOperation(op);
- n++;
- }
- }
-}
-
-/* note: not yet in mutex at this point. */
-void Source::sync() {
- log() << "pull: from " << sourceName << '@' << hostName << endl;
-
- DBClientConnection conn;
- string errmsg;
+bool cloneFrom(const char *masterHost, string& errmsg)
+{
+ Cloner c;
+ return c.go(masterHost, errmsg);
+}
+
+/* --------------------------------------------------------------*/
+
+Source::Source(JSObj o) {
+ hostName = o.getStringField("host");
+ sourceName = o.getStringField("source");
+ uassert( !hostName.empty() );
+ uassert( !sourceName.empty() );
+ Element e = o.getField("syncedTo");
+ if( !e.eoo() ) {
+ uassert( e.type() == Date );
+ syncedTo.asDate() = e.date();
+ }
+
+ JSObj dbsObj = o.getObjectField("dbs");
+ if( !dbsObj.isEmpty() ) {
+ JSElemIter i(dbsObj);
+ while( 1 ) {
+ Element e = i.next();
+ if( e.eoo() )
+ break;
+ dbs.insert( e.fieldName() );
+ }
+ }
+}
+
+/* Turn our C++ Source object into a JSObj */
+JSObj Source::jsobj() {
+ JSObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName);
+ b.appendDate("syncedTo", syncedTo.asDate());
+
+ JSObjBuilder dbs_builder;
+ for( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
+ dbs_builder.appendBool(i->c_str(), 1);
+ }
+ b.append("dbs", dbs_builder.done());
+
+ return b.doneAndDecouple();
+}
+
+void Source::save() {
+ JSObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName);
+ JSObj pattern = b.done();
+
+ JSObj o = jsobj();
+
+ stringstream ss;
+ setClient("local.sources");
+ //cout << o.toString() << endl;
+ //cout << pattern.toString() << endl;
+ int u = _updateObjects("local.sources", o, pattern, false, ss);
+ assert( u == 1 );
+ client = 0;
+}
+
+void Source::cleanup(vector<Source*>& v) {
+ for( vector<Source*>::iterator i = v.begin(); i != v.end(); i++ )
+ delete *i;
+}
+
+void Source::loadAll(vector<Source*>& v) {
+ setClient("local.sources");
+ auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
+ while( c->ok() ) {
+ v.push_back( new Source(c->current()) );
+ c->advance();
+ }
+ client = 0;
+}
+
+JSObj opTimeQuery = fromjson("{getoptime:1}");
+
+bool Source::resync(string db) {
+ {
+ log() << "resync: dropping database " << db << endl;
+ string dummyns = db + ".";
+ assert( client->name == db );
+ dropDatabase(dummyns.c_str());
+ setClientTempNs(dummyns.c_str());
+ }
+
+ {
+ log() << "resync: cloning database " << db << endl;
+ Cloner c;
+ string errmsg;
+ bool ok = c.go(hostName.c_str(), errmsg);
+ if( !ok ) {
+ problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
+ throw SyncException();
+ }
+ }
+
+ log() << "resync: done " << db << endl;
+ dbs.insert(db);
+ return true;
+}
+
+/* { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
+ You must lock dbMutex before calling.
+*/
+void Source::applyOperation(JSObj& op) {
+ stringstream ss;
+ const char *ns = op.getStringField("ns");
+ setClientTempNs(ns);
+
+ if( client->justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */
+ !dbs.count(client->name) ) /* if not in dbs, we've never synced this database before, so we need everything */
+ {
+ resync(client->name);
+ client->justCreated = false;
+ }
+
+ const char *opType = op.getStringField("op");
+ JSObj o = op.getObjectField("o");
+ if( *opType == 'i' ) {
+ // do upserts for inserts as we might get replayed more than once
+ OID *oid = o.getOID();
+ if( oid == 0 ) {
+ _updateObjects(ns, o, o, true, ss);
+ }
+ else {
+ JSObjBuilder b;
+ b.appendOID("_id", oid);
+ _updateObjects(ns, o, b.done(), true, ss);
+ }
+ // theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
+ }
+ else if( *opType == 'u' ) {
+ _updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss);
+ }
+ else if( *opType == 'd' ) {
+ deleteObjects(ns, o, op.getBoolField("b"));
+ }
+ else {
+ BufBuilder bb;
+ JSObjBuilder ob;
+ assert( *opType == 'c' );
+ _runCommands(ns, o, ss, bb, ob);
+ }
+ client = 0;
+}
+
+/* note: not yet in mutex at this point. */
+void Source::pullOpLog(DBClientConnection& conn) {
+ JSObjBuilder q;
+ q.appendDate("$gte", syncedTo.asDate());
+ JSObjBuilder query;
+ query.append("ts", q.done());
+ // query = { ts: { $gte: syncedTo } }
+
+ string ns = string("local.oplog.$") + sourceName;
+ auto_ptr<DBClientCursor> c =
+ conn.query(ns.c_str(), query.done());
+ if( !c->more() ) {
+ problem() << "pull: " << ns << " empty?\n";
+ sleepsecs(3);
+ return;
+ }
+
+ int n = 0;
+ JSObj op = c->next();
+ Element ts = op.findElement("ts");
+ assert( ts.type() == Date );
+ OpTime t;
+ t.asDate() = ts.date();
+ bool initial = syncedTo.isNull();
+ if( initial ) {
+ log() << "pull: initial run\n";
+ {
+ dblock lk;
+ applyOperation(op);
+ n++;
+ }
+ }
+ else if( t != syncedTo ) {
+ log() << "pull: t " << t.toString() << " != syncedTo " << syncedTo.toString() << '\n';
+ log() << " data too stale, halting replication" << endl;
+ assert( syncedTo < t );
+ throw SyncException();
+ }
+ else {
+ /* t == syncedTo, so the first op was applied previously, no need to redo it. */
+ }
+
+ // apply operations
+ {
+ dblock lk;
+ while( 1 ) {
+ if( !c->more() ) {
+ log() << "pull: applied " << n << " operations" << endl;
+ syncedTo = t;
+ save(); // note how far we are synced up to now
+ break;
+ }
+ /* todo: get out of the mutex for the next()? */
+ JSObj op = c->next();
+ ts = op.findElement("ts");
+ assert( ts.type() == Date );
+ OpTime last = t;
+ t.asDate() = ts.date();
+ if( !( last < t ) ) {
+ problem() << "sync error: last " << last.toString() << " >= t " << t.toString() << endl;
+ uassert(false);
+ }
+
+ applyOperation(op);
+ n++;
+ }
+ }
+}
+
+/* note: not yet in mutex at this point. */
+void Source::sync() {
+ log() << "pull: from " << sourceName << '@' << hostName << endl;
+
+ if( (string("localhost") == hostName || string("127.0.0.1") == hostName) && port == DBPort ) {
+ log() << "pull: can't sync from self (localhost). sources configuration may be wrong." << endl;
+ return;
+ }
+
+ DBClientConnection conn;
+ string errmsg;
if( !conn.connect(hostName.c_str(), errmsg) ) {
- log() << " pull: cantconn " << errmsg << endl;
+ log() << "pull: cantconn " << errmsg << endl;
return;
}
// get current mtime at the server.
- JSObj o = conn.findOne("admin.$cmd", opTimeQuery);
- Element e = o.findElement("optime");
- if( e.eoo() ) {
- log() << " pull: failed to get cur optime from master" << endl;
- log() << " " << o.toString() << endl;
- return;
- }
- uassert( e.type() == Date );
- OpTime serverCurTime;
- serverCurTime.asDate() = e.date();
-
- pullOpLog(conn);
-}
-
-/* -- Logging of operations -------------------------------------*/
-
-// cached copies of these...
-NamespaceDetails *localOplogMainDetails = 0;
-Client *localOplogClient = 0;
-
-/* we write to local.opload.$main:
- { ts : ..., op: ..., ns: ..., o: ... }
- ts: an OpTime timestamp
- op:
- 'i' = insert
-*/
-void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb) {
- if( strncmp(ns, "local.", 6) == 0 )
- return;
-
- Client *oldClient = client;
- if( localOplogMainDetails == 0 ) {
- setClientTempNs("local.");
- localOplogClient = client;
- localOplogMainDetails = nsdetails("local.oplog.$main");
- }
- client = localOplogClient;
-
- /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
- instead we do a single copy to the destination position in the memory mapped file.
- */
-
- JSObjBuilder b;
- b.appendDate("ts", OpTime::now().asDate());
- b.append("op", opstr);
- b.append("ns", ns);
- if( bb )
- b.appendBool("b", *bb);
- if( o2 )
- b.append("o2", *o2);
- JSObj partial = b.done();
- int posz = partial.objsize();
- int len = posz + obj.objsize() + 1 + 2 /*o:*/;
-
- Record *r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, "local.oplog.$main", len);
-
- char *p = r->data;
- memcpy(p, partial.objdata(), posz);
- *((unsigned *)p) += obj.objsize() + 1 + 2;
- p += posz - 1;
- *p++ = (char) Object;
- *p++ = 'o';
- *p++ = 0;
- memcpy(p, obj.objdata(), obj.objsize());
- p += obj.objsize();
- *p = EOO;
-
- client = oldClient;
-}
-
-/* --------------------------------------------------------------*/
-
-void replMain() {
- vector<Source*> sources;
-
- {
- lock lk(dbMutex);
- Source::loadAll(sources);
- }
-
- if( sources.empty() )
- sleepsecs(20);
-
- try {
- for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ )
- (*i)->sync();
- }
- catch( SyncException ) {
- sleepsecs(300);
- }
-
- Source::cleanup(sources);
-}
-
-int debug_stop_repl = 0;
-
-void replMainThread() {
- while( 1 ) {
- try {
- replMain();
- if( debug_stop_repl )
- break;
- sleepsecs(5);
- }
- catch( AssertionException ) {
- problem() << "Assertion in replMainThread(): sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
- }
- }
-}
-
-void startReplication() {
-#if defined(_WIN32)
- slave=true;
-#endif
- if( slave ) {
- log() << "slave=true" << endl;
- boost::thread repl_thread(replMainThread);
- }
-
- if( master ) {
- log() << "master=true" << endl;
- lock lk(dbMutex);
- /* create an oplog collection, if it doesn't yet exist. */
- JSObjBuilder b;
- b.append("size", 254.0 * 1000 * 1000);
- b.appendBool("capped", 1);
- setClientTempNs("local.oplog.$main");
- string err;
- JSObj o = b.done();
- userCreateNS("local.oplog.$main", o, err);
- client = 0;
- }
-}
+ JSObj o = conn.findOne("admin.$cmd", opTimeQuery);
+ Element e = o.findElement("optime");
+ if( e.eoo() ) {
+ log() << "pull: failed to get cur optime from master" << endl;
+ log() << " " << o.toString() << endl;
+ return;
+ }
+ uassert( e.type() == Date );
+ OpTime serverCurTime;
+ serverCurTime.asDate() = e.date();
+
+ pullOpLog(conn);
+}
+
+/* -- Logging of operations -------------------------------------*/
+
+// cached copies of these...
+NamespaceDetails *localOplogMainDetails = 0;
+Client *localOplogClient = 0;
+
+/* we write to local.opload.$main:
+ { ts : ..., op: ..., ns: ..., o: ... }
+ ts: an OpTime timestamp
+ op:
+ 'i' = insert
+*/
+void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb) {
+ if( strncmp(ns, "local.", 6) == 0 )
+ return;
+
+ Client *oldClient = client;
+ if( localOplogMainDetails == 0 ) {
+ setClientTempNs("local.");
+ localOplogClient = client;
+ localOplogMainDetails = nsdetails("local.oplog.$main");
+ }
+ client = localOplogClient;
+
+ /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
+ instead we do a single copy to the destination position in the memory mapped file.
+ */
+
+ JSObjBuilder b;
+ b.appendDate("ts", OpTime::now().asDate());
+ b.append("op", opstr);
+ b.append("ns", ns);
+ if( bb )
+ b.appendBool("b", *bb);
+ if( o2 )
+ b.append("o2", *o2);
+ JSObj partial = b.done();
+ int posz = partial.objsize();
+ int len = posz + obj.objsize() + 1 + 2 /*o:*/;
+
+ Record *r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, "local.oplog.$main", len);
+
+ char *p = r->data;
+ memcpy(p, partial.objdata(), posz);
+ *((unsigned *)p) += obj.objsize() + 1 + 2;
+ p += posz - 1;
+ *p++ = (char) Object;
+ *p++ = 'o';
+ *p++ = 0;
+ memcpy(p, obj.objdata(), obj.objsize());
+ p += obj.objsize();
+ *p = EOO;
+
+ client = oldClient;
+}
+
+/* --------------------------------------------------------------*/
+
+void replMain() {
+ vector<Source*> sources;
+
+ {
+ dblock lk;
+ Source::loadAll(sources);
+ }
+
+ if( sources.empty() )
+ sleepsecs(20);
+
+ try {
+ for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ )
+ (*i)->sync();
+ }
+ catch( SyncException ) {
+ sleepsecs(300);
+ }
+
+ Source::cleanup(sources);
+}
+
+int debug_stop_repl = 0;
+
+void replSlaveThread() {
+ sleepsecs(3);
+ while( 1 ) {
+ try {
+ replMain();
+ if( debug_stop_repl )
+ break;
+ sleepsecs(5);
+ }
+ catch( AssertionException ) {
+ problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ }
+ }
+}
+
+void startReplication() {
+ if( slave ) {
+ log() << "slave=true" << endl;
+ boost::thread repl_thread(replSlaveThread);
+ }
+
+ if( master ) {
+ log() << "master=true" << endl;
+ dblock lk;
+ /* create an oplog collection, if it doesn't yet exist. */
+ JSObjBuilder b;
+ b.append("size", 254.0 * 1000 * 1000);
+ b.appendBool("capped", 1);
+ setClientTempNs("local.oplog.$main");
+ string err;
+ JSObj o = b.done();
+ userCreateNS("local.oplog.$main", o, err);
+ client = 0;
+ }
+}
diff --git a/grid/message.cpp b/grid/message.cpp
index e0cdea59eba..205ec318a3a 100644
--- a/grid/message.cpp
+++ b/grid/message.cpp
@@ -55,11 +55,11 @@ void Listener::listen() {
while( 1 ) {
int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
if( s < 0 ) {
- cout << "Listener: accept() returns " << s << " errno:" << errno << endl;
+ log() << "Listener: accept() returns " << s << " errno:" << errno << endl;
continue;
}
disableNagle(s);
- cout << "Listener: connection accepted from " << from.toString() << endl;
+ log() << "connection accepted from " << from.toString() << endl;
accepted( new MessagingPort(s, from) );
}
}
@@ -112,7 +112,7 @@ bool MessagingPort::connect(SockAddr& _far)
return false;
}
if( ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize) ) {
- cout << "ERROR: connect(): connect() failed " << errno << ' ' << farEnd.getPort() << endl;
+ log() << "connect(): failed errno:" << errno << ' ' << farEnd.getPort() << endl;
closesocket(sock); sock = -1;
return false;
}
diff --git a/util/goodies.h b/util/goodies.h
index 282f96694d0..631ed12b9f6 100644
--- a/util/goodies.h
+++ b/util/goodies.h
@@ -189,15 +189,5 @@ public:
};
*/
-/*
-struct lock {
-boostlock bl;
- DebugMutex& m;
- lock(DebugMutex& _m) : m(_m) {
- do_lock();
- }
- ~lock() { do_unlock(); }
-}
-*/
-typedef boostlock lock;
+//typedef boostlock lock;
diff --git a/util/sock.h b/util/sock.h
index 1e101b619ac..1840c3fc10f 100644
--- a/util/sock.h
+++ b/util/sock.h
@@ -58,11 +58,11 @@ inline void disableNagle(int sock) {
#endif
if( setsockopt(sock, level, TCP_NODELAY, (char *) &x, sizeof(x)) )
- cout << "ERROR: disableNagle failed" << endl;
+ log() << "ERROR: disableNagle failed" << endl;
}
inline void prebindOptions( int sock ){
- cout << "doing prebind option" << endl;
+ log() << "doing prebind option" << endl;
int x = 1;
if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &x, sizeof(x)) < 0 )
cout << "Failed to set socket opt, SO_REUSEADDR" << endl;
@@ -71,12 +71,12 @@ inline void prebindOptions( int sock ){
#endif
-// .empty() if err
-inline string hostbyname_nonreentrant(const char *hostname) {
- struct hostent *h;
- h = gethostbyname(hostname);
- if( h == 0 ) return "";
- return inet_ntoa( *((struct in_addr *)(h->h_addr)) );
+// .empty() if err
+inline string hostbyname_nonreentrant(const char *hostname) {
+ struct hostent *h;
+ h = gethostbyname(hostname);
+ if( h == 0 ) return "";
+ return inet_ntoa( *((struct in_addr *)(h->h_addr)) );
}
struct SockAddr {