diff options
author | dwight <dwight@Dwights-MacBook.local> | 2008-08-02 14:58:15 -0400 |
---|---|---|
committer | dwight <dwight@Dwights-MacBook.local> | 2008-08-02 14:58:15 -0400 |
commit | bc24777c118e6d84f14c71a9f5e310623e7551fa (patch) | |
tree | 842288ac92d6b0a39da7be535a3c14f9c999c2b0 | |
parent | 9d5178583fdfedfb0b20c552688393493190fd72 (diff) | |
download | mongo-bc24777c118e6d84f14c71a9f5e310623e7551fa.tar.gz |
replication fixes plus some log cleanup
-rw-r--r-- | db/db.cpp | 5 | ||||
-rw-r--r-- | db/db.h | 16 | ||||
-rw-r--r-- | db/db.vcproj | 4 | ||||
-rw-r--r-- | db/javajs.cpp | 29 | ||||
-rw-r--r-- | db/pdfile.h | 22 | ||||
-rw-r--r-- | db/repl.cpp | 727 | ||||
-rw-r--r-- | grid/message.cpp | 6 | ||||
-rw-r--r-- | util/goodies.h | 12 | ||||
-rw-r--r-- | util/sock.h | 16 |
9 files changed, 432 insertions, 405 deletions
diff --git a/db/db.cpp b/db/db.cpp index c203654a810..b3bc27a6c29 100644 --- a/db/db.cpp +++ b/db/db.cpp @@ -40,6 +40,7 @@ extern int curOp; bool useCursors = true; boost::mutex dbMutex; +int dbLocked = 0; void closeAllSockets(); void startReplication(); @@ -531,7 +532,7 @@ void connThread() DbResponse dbresponse; { - lock lk(dbMutex); + dblock lk; Timer t; client = 0; curOp = 0; @@ -741,7 +742,7 @@ void mysighandler(int x) { cout << "got kill or ctrl c signal " << x << ", will terminate after current cmd ends" << endl; problem() << "got kill or ctrl c signal " << x << ", will terminate after current cmd ends" << endl; { - lock lk(dbMutex); + dblock lk; problem() << " now exiting" << endl; exit(12); } @@ -18,3 +18,19 @@ #include "../grid/message.h" void jniCallback(Message& m, Message& out); + +extern boost::mutex dbMutex; +extern int dbLocked; + +struct dblock { + boostlock bl; + dblock() : bl(dbMutex) { + dbLocked++; + assert( dbLocked == 1 ); + } + ~dblock() { + dbLocked--; + assert( dbLocked == 0 ); + } +}; + diff --git a/db/db.vcproj b/db/db.vcproj index 9bcd685d7e1..710b14fa51e 100644 --- a/db/db.vcproj +++ b/db/db.vcproj @@ -43,7 +43,7 @@ <Tool
Name="VCCLCompilerTool"
Optimization="0"
- AdditionalIncludeDirectories=""..\pcre-7.4";..\boost;"C:\Program Files\Java\jdk1.6.0_05\include";"C:\Program Files\Java\jdk1.6.0_05\include\win32""
+ AdditionalIncludeDirectories=""..\pcre-7.4";..\boost;"C:\Program Files\Java\jdk\include";"C:\Program Files\Java\jdk\include\win32""
PreprocessorDefinitions="WIN32;_DEBUG;_CONSOLE;_SCL_SECURE_NO_DEPRECATE;BOOST_ALL_NO_LIB;BOOST_LIB_DIAGNOSTIC;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H"
MinimalRebuild="true"
BasicRuntimeChecks="3"
@@ -431,7 +431,7 @@ >
</File>
<File
- RelativePath="..\..\..\Program Files\Java\jdk1.6.0_05\lib\jvm.lib"
+ RelativePath="C:\Program Files\Java\jdk\lib\jvm.lib"
>
</File>
<File
diff --git a/db/javajs.cpp b/db/javajs.cpp index 34156707d9f..a5281c4d3ec 100644 --- a/db/javajs.cpp +++ b/db/javajs.cpp @@ -145,9 +145,9 @@ JavaJSImpl::JavaJSImpl(const char *appserverPath){ jint res = JNI_CreateJavaVM( &_jvm, (void**)&_mainEnv, _vmArgs ); if( res ) { - cout << "using classpath: " << q << endl; - cerr - << "res : " << res << " " + log() << "using classpath: " << q << endl; + log() + << " res : " << (unsigned) res << " " << "_jvm : " << _jvm << " " << "_env : " << _mainEnv << " " << endl; @@ -341,7 +341,7 @@ void JavaJSImpl::run( const char * js ){ jassert( m ); jstring s = _getEnv()->NewStringUTF( js ); - cout << _getEnv()->CallStaticObjectMethod( c , m , s ) << endl; + log() << _getEnv()->CallStaticObjectMethod( c , m , s ) << endl; } void JavaJSImpl::printException(){ @@ -365,7 +365,7 @@ JNIEnv * JavaJSImpl::_getEnv(){ } void jasserted(const char *msg, const char *file, unsigned line) { - cout << "jassert failed " << msg << " " << file << " " << line << endl; + log() << "jassert failed " << msg << " " << file << " " << line << endl; if ( JavaJS ) JavaJS->printException(); throw AssertionException(); } @@ -382,22 +382,22 @@ const char* findEd(const char *path) { return findEd(); } - cout << "Appserver location specified : " << path << endl; + log() << "Appserver location specified : " << path << endl; if (!path) { - cout << " invalid appserver location : " << path << " : terminating - prepare for bus error" << endl; + log() << " invalid appserver location : " << path << " : terminating - prepare for bus error" << endl; return 0; } DIR *testDir = opendir(path); if (testDir) { - cout << " found directory for appserver : " << path << endl; + log() << " found directory for appserver : " << path << endl; closedir(testDir); return path; } else { - cout << " ERROR : not a directory for specified appserver location : " << path << " - prepare for bus error" << endl; + log() << " ERROR : not a directory for specified appserver location : " << path << " - prepare for bus error" << endl; return null; } #endif @@ -405,7 +405,7 @@ const char* findEd(const char *path) { const char * findEd(){ - cout << "Appserver location not specified. Searching.... " << endl; + log() << "Appserver location not specified. Searching.... " << endl; #if defined(_WIN32) log() << " WIN32 default : c:/l/ed/" << endl; @@ -426,11 +426,12 @@ const char * findEd(){ continue; closedir( test ); - cout << " found directory for appserver : " << temp << endl; + log() << " found directory for appserver : " << temp << endl; return temp; } - cout << " ERROR : can't find directory for appserver - prepare for bus error" << endl; + problem() << "ERROR : can't find directory for appserver - terminating" << endl; + exit(44); return 0; #endif }; @@ -472,7 +473,7 @@ int javajstest() { JavaJSImpl& JavaJS = *::JavaJS; - if ( debug ) cout << "about to create scope" << endl; + if ( debug ) log() << "about to create scope" << endl; jlong scope = JavaJS.scopeCreate(); jassert( scope ); if ( debug ) cout << "got scope" << endl; @@ -502,7 +503,7 @@ int javajstest() { if ( debug ) cout << "going to get object" << endl; JSObj obj = JavaJS.scopeGetObject( scope , "abc" ); - if ( debug ) cout << "done gettting object" << endl; + if ( debug ) cout << "done getting object" << endl; if ( debug ){ cout << "obj : " << obj.toString() << endl; diff --git a/db/pdfile.h b/db/pdfile.h index 7f731a5b962..75b249849e6 100644 --- a/db/pdfile.h +++ b/db/pdfile.h @@ -103,13 +103,13 @@ public: /* Record is a record in a datafile. DeletedRecord is similar but for deleted space. -*11:03:20 AM) dm10gen: regarding extentOfs...
-(11:03:42 AM) dm10gen: an extent is a continugous disk area, which contains many Records and DeleteRecords
-(11:03:56 AM) dm10gen: a DiskLoc has two pieces, the fileno and ofs. (64 bit total)
-(11:04:16 AM) dm10gen: to keep the headesr small, instead of storing a 64 bit ptr to the full extent address, we keep just the offset
-(11:04:29 AM) dm10gen: we can do this as we know the record's address, and it has the same fileNo
-(11:04:33 AM) dm10gen: see class DiskLoc for more info
-(11:04:43 AM) dm10gen: so that is how Record::myExtent() works
+*11:03:20 AM) dm10gen: regarding extentOfs... +(11:03:42 AM) dm10gen: an extent is a continugous disk area, which contains many Records and DeleteRecords +(11:03:56 AM) dm10gen: a DiskLoc has two pieces, the fileno and ofs. (64 bit total) +(11:04:16 AM) dm10gen: to keep the headesr small, instead of storing a 64 bit ptr to the full extent address, we keep just the offset +(11:04:29 AM) dm10gen: we can do this as we know the record's address, and it has the same fileNo +(11:04:33 AM) dm10gen: see class DiskLoc for more info +(11:04:43 AM) dm10gen: so that is how Record::myExtent() works (11:04:53 AM) dm10gen: on an alloc(), when we build a new Record, we must popular its extentOfs then */ class Record { @@ -137,7 +137,7 @@ public: /* extents are datafile regions where all the records within the region belong to the same namespace. -(11:12:35 AM) dm10gen: when the extent is allocated, all its empty space is stuck into one big DeletedRecord
+(11:12:35 AM) dm10gen: when the extent is allocated, all its empty space is stuck into one big DeletedRecord (11:12:55 AM) dm10gen: and that is placed on the free list */ class Extent { @@ -446,7 +446,13 @@ public: extern map<string,Client*> clients; extern Client *client; extern const char *curNs; +extern int dbLocked; inline void setClient(const char *ns) { + /* we must be in critical section at this point as these are global + variables. + */ + assert( dbLocked == 1 ); + char cl[256]; curNs = ns; nsToClient(ns, cl); diff --git a/db/repl.cpp b/db/repl.cpp index 6ee1b8a7027..be3d39f76be 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -25,13 +25,14 @@ #include "pdfile.h" #include "query.h" #include "json.h" +#include "db.h" extern JSObj emptyObj; extern boost::mutex dbMutex; auto_ptr<Cursor> findTableScan(const char *ns, JSObj& order); bool userCreateNS(const char *ns, JSObj& j, string& err); -int _updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss);
-bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuilder &b, JSObjBuilder& anObjBuilder);
+int _updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss); +bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuilder &b, JSObjBuilder& anObjBuilder); OpTime last(0, 0); @@ -88,9 +89,10 @@ void Cloner::copy(const char *collection) { } } +extern int port; bool Cloner::go(const char *masterHost, string& errmsg) { - if( string("localhost") == masterHost || string("127.0.0.1") == masterHost ) { - errmsg = "can't clone from self"; + if( (string("localhost") == masterHost || string("127.0.0.1") == masterHost) && port == DBPort ) { + errmsg = "can't clone from self (localhost). sources configuration may be wrong."; return false; } if( !conn.connect(masterHost, errmsg) ) @@ -127,361 +129,372 @@ bool Cloner::go(const char *masterHost, string& errmsg) { return true; } -bool cloneFrom(const char *masterHost, string& errmsg)
-{
- Cloner c;
- return c.go(masterHost, errmsg);
-}
-
-/* --------------------------------------------------------------*/
-
-Source::Source(JSObj o) {
- hostName = o.getStringField("host");
- sourceName = o.getStringField("source");
- uassert( !hostName.empty() );
- uassert( !sourceName.empty() );
- Element e = o.getField("syncedTo");
- if( !e.eoo() ) {
- uassert( e.type() == Date );
- syncedTo.asDate() = e.date();
- }
-
- JSObj dbsObj = o.getObjectField("dbs");
- if( !dbsObj.isEmpty() ) {
- JSElemIter i(dbsObj);
- while( 1 ) {
- Element e = i.next();
- if( e.eoo() )
- break;
- dbs.insert( e.fieldName() );
- }
- }
-}
-
-/* Turn our C++ Source object into a JSObj */
-JSObj Source::jsobj() {
- JSObjBuilder b;
- b.append("host", hostName);
- b.append("source", sourceName);
- b.appendDate("syncedTo", syncedTo.asDate());
-
- JSObjBuilder dbs_builder;
- for( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
- dbs_builder.appendBool(i->c_str(), 1);
- }
- b.append("dbs", dbs_builder.done());
-
- return b.doneAndDecouple();
-}
-
-void Source::save() {
- JSObjBuilder b;
- b.append("host", hostName);
- b.append("source", sourceName);
- JSObj pattern = b.done();
-
- JSObj o = jsobj();
-
- stringstream ss;
- setClient("local.sources");
- //cout << o.toString() << endl;
- //cout << pattern.toString() << endl;
- int u = _updateObjects("local.sources", o, pattern, false, ss);
- assert( u == 1 );
- client = 0;
-}
-
-void Source::cleanup(vector<Source*>& v) {
- for( vector<Source*>::iterator i = v.begin(); i != v.end(); i++ )
- delete *i;
-}
-
-void Source::loadAll(vector<Source*>& v) {
- setClient("local.sources");
- auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
- while( c->ok() ) {
- v.push_back( new Source(c->current()) );
- c->advance();
- }
- client = 0;
-}
-
-JSObj opTimeQuery = fromjson("{getoptime:1}");
-
-bool Source::resync(string db) {
- {
- log() << "resync: dropping database " << db << endl;
- string dummyns = db + ".";
- assert( client->name == db );
- dropDatabase(dummyns.c_str());
- setClientTempNs(dummyns.c_str());
- }
-
- {
- log() << "resync: cloning database " << db << endl;
- Cloner c;
- string errmsg;
- bool ok = c.go(hostName.c_str(), errmsg);
- if( !ok ) {
- problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
- throw SyncException();
- }
- }
-
- log() << "resync: done " << db << endl;
- dbs.insert(db);
- return true;
-}
-
-/* { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
- You must lock dbMutex before calling.
-*/
-void Source::applyOperation(JSObj& op) {
- stringstream ss;
- const char *ns = op.getStringField("ns");
- setClientTempNs(ns);
-
- if( client->justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */
- !dbs.count(client->name) ) /* if not in dbs, we've never synced this database before, so we need everything */
- {
- resync(client->name);
- client->justCreated = false;
- }
-
- const char *opType = op.getStringField("op");
- JSObj o = op.getObjectField("o");
- if( *opType == 'i' ) {
- // do upserts for inserts as we might get replayed more than once
- OID *oid = o.getOID();
- if( oid == 0 ) {
- _updateObjects(ns, o, o, true, ss);
- }
- else {
- JSObjBuilder b;
- b.appendOID("_id", oid);
- _updateObjects(ns, o, b.done(), true, ss);
- }
- // theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
- }
- else if( *opType == 'u' ) {
- _updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss);
- }
- else if( *opType == 'd' ) {
- deleteObjects(ns, o, op.getBoolField("b"));
- }
- else {
- BufBuilder bb;
- JSObjBuilder ob;
- assert( *opType == 'c' );
- _runCommands(ns, o, ss, bb, ob);
- }
- client = 0;
-}
-
-/* note: not yet in mutex at this point. */
-void Source::pullOpLog(DBClientConnection& conn) {
- JSObjBuilder q;
- q.appendDate("$gte", syncedTo.asDate());
- JSObjBuilder query;
- query.append("ts", q.done());
- // query = { ts: { $gte: syncedTo } }
-
- string ns = string("local.oplog.$") + sourceName;
- auto_ptr<DBClientCursor> c =
- conn.query(ns.c_str(), query.done());
- if( !c->more() ) {
- problem() << "pull: " << ns << " empty?\n";
- sleepsecs(3);
- return;
- }
-
- JSObj j = c->next();
- Element ts = j.findElement("ts");
- assert( ts.type() == Date );
- OpTime t;
- t.asDate() = ts.date();
- bool initial = syncedTo.isNull();
- if( initial ) {
- log() << "pull: initial run\n";
- }
- else if( t != syncedTo ) {
- log() << "pull: t " << t.toString() << " != syncedTo " << syncedTo.toString() << '\n';
- log() << " data too stale, halting replication" << endl;
- assert( syncedTo < t );
- throw SyncException();
- }
-
- // apply operations
- int n = 0;
- {
- lock lk(dbMutex);
- while( 1 ) {
- if( !c->more() ) {
- log() << "pull: applied " << n << " operations" << endl;
- syncedTo = t;
- save(); // note how far we are synced up to now
- break;
- }
- /* todo: get out of the mutex for the next()? */
- JSObj op = c->next();
- ts = op.findElement("ts");
- assert( ts.type() == Date );
- OpTime last = t;
- t.asDate() = ts.date();
- if( !( last < t ) ) {
- problem() << "sync error: last " << last.toString() << " >= t " << t.toString() << endl;
- uassert(false);
- }
-
- applyOperation(op);
- n++;
- }
- }
-}
-
-/* note: not yet in mutex at this point. */
-void Source::sync() {
- log() << "pull: from " << sourceName << '@' << hostName << endl;
-
- DBClientConnection conn;
- string errmsg;
+bool cloneFrom(const char *masterHost, string& errmsg) +{ + Cloner c; + return c.go(masterHost, errmsg); +} + +/* --------------------------------------------------------------*/ + +Source::Source(JSObj o) { + hostName = o.getStringField("host"); + sourceName = o.getStringField("source"); + uassert( !hostName.empty() ); + uassert( !sourceName.empty() ); + Element e = o.getField("syncedTo"); + if( !e.eoo() ) { + uassert( e.type() == Date ); + syncedTo.asDate() = e.date(); + } + + JSObj dbsObj = o.getObjectField("dbs"); + if( !dbsObj.isEmpty() ) { + JSElemIter i(dbsObj); + while( 1 ) { + Element e = i.next(); + if( e.eoo() ) + break; + dbs.insert( e.fieldName() ); + } + } +} + +/* Turn our C++ Source object into a JSObj */ +JSObj Source::jsobj() { + JSObjBuilder b; + b.append("host", hostName); + b.append("source", sourceName); + b.appendDate("syncedTo", syncedTo.asDate()); + + JSObjBuilder dbs_builder; + for( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) { + dbs_builder.appendBool(i->c_str(), 1); + } + b.append("dbs", dbs_builder.done()); + + return b.doneAndDecouple(); +} + +void Source::save() { + JSObjBuilder b; + b.append("host", hostName); + b.append("source", sourceName); + JSObj pattern = b.done(); + + JSObj o = jsobj(); + + stringstream ss; + setClient("local.sources"); + //cout << o.toString() << endl; + //cout << pattern.toString() << endl; + int u = _updateObjects("local.sources", o, pattern, false, ss); + assert( u == 1 ); + client = 0; +} + +void Source::cleanup(vector<Source*>& v) { + for( vector<Source*>::iterator i = v.begin(); i != v.end(); i++ ) + delete *i; +} + +void Source::loadAll(vector<Source*>& v) { + setClient("local.sources"); + auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj); + while( c->ok() ) { + v.push_back( new Source(c->current()) ); + c->advance(); + } + client = 0; +} + +JSObj opTimeQuery = fromjson("{getoptime:1}"); + +bool Source::resync(string db) { + { + log() << "resync: dropping database " << db << endl; + string dummyns = db + "."; + assert( client->name == db ); + dropDatabase(dummyns.c_str()); + setClientTempNs(dummyns.c_str()); + } + + { + log() << "resync: cloning database " << db << endl; + Cloner c; + string errmsg; + bool ok = c.go(hostName.c_str(), errmsg); + if( !ok ) { + problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl; + throw SyncException(); + } + } + + log() << "resync: done " << db << endl; + dbs.insert(db); + return true; +} + +/* { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> } + You must lock dbMutex before calling. +*/ +void Source::applyOperation(JSObj& op) { + stringstream ss; + const char *ns = op.getStringField("ns"); + setClientTempNs(ns); + + if( client->justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */ + !dbs.count(client->name) ) /* if not in dbs, we've never synced this database before, so we need everything */ + { + resync(client->name); + client->justCreated = false; + } + + const char *opType = op.getStringField("op"); + JSObj o = op.getObjectField("o"); + if( *opType == 'i' ) { + // do upserts for inserts as we might get replayed more than once + OID *oid = o.getOID(); + if( oid == 0 ) { + _updateObjects(ns, o, o, true, ss); + } + else { + JSObjBuilder b; + b.appendOID("_id", oid); + _updateObjects(ns, o, b.done(), true, ss); + } + // theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize()); + } + else if( *opType == 'u' ) { + _updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss); + } + else if( *opType == 'd' ) { + deleteObjects(ns, o, op.getBoolField("b")); + } + else { + BufBuilder bb; + JSObjBuilder ob; + assert( *opType == 'c' ); + _runCommands(ns, o, ss, bb, ob); + } + client = 0; +} + +/* note: not yet in mutex at this point. */ +void Source::pullOpLog(DBClientConnection& conn) { + JSObjBuilder q; + q.appendDate("$gte", syncedTo.asDate()); + JSObjBuilder query; + query.append("ts", q.done()); + // query = { ts: { $gte: syncedTo } } + + string ns = string("local.oplog.$") + sourceName; + auto_ptr<DBClientCursor> c = + conn.query(ns.c_str(), query.done()); + if( !c->more() ) { + problem() << "pull: " << ns << " empty?\n"; + sleepsecs(3); + return; + } + + int n = 0; + JSObj op = c->next(); + Element ts = op.findElement("ts"); + assert( ts.type() == Date ); + OpTime t; + t.asDate() = ts.date(); + bool initial = syncedTo.isNull(); + if( initial ) { + log() << "pull: initial run\n"; + { + dblock lk; + applyOperation(op); + n++; + } + } + else if( t != syncedTo ) { + log() << "pull: t " << t.toString() << " != syncedTo " << syncedTo.toString() << '\n'; + log() << " data too stale, halting replication" << endl; + assert( syncedTo < t ); + throw SyncException(); + } + else { + /* t == syncedTo, so the first op was applied previously, no need to redo it. */ + } + + // apply operations + { + dblock lk; + while( 1 ) { + if( !c->more() ) { + log() << "pull: applied " << n << " operations" << endl; + syncedTo = t; + save(); // note how far we are synced up to now + break; + } + /* todo: get out of the mutex for the next()? */ + JSObj op = c->next(); + ts = op.findElement("ts"); + assert( ts.type() == Date ); + OpTime last = t; + t.asDate() = ts.date(); + if( !( last < t ) ) { + problem() << "sync error: last " << last.toString() << " >= t " << t.toString() << endl; + uassert(false); + } + + applyOperation(op); + n++; + } + } +} + +/* note: not yet in mutex at this point. */ +void Source::sync() { + log() << "pull: from " << sourceName << '@' << hostName << endl; + + if( (string("localhost") == hostName || string("127.0.0.1") == hostName) && port == DBPort ) { + log() << "pull: can't sync from self (localhost). sources configuration may be wrong." << endl; + return; + } + + DBClientConnection conn; + string errmsg; if( !conn.connect(hostName.c_str(), errmsg) ) { - log() << " pull: cantconn " << errmsg << endl; + log() << "pull: cantconn " << errmsg << endl; return; } // get current mtime at the server. - JSObj o = conn.findOne("admin.$cmd", opTimeQuery);
- Element e = o.findElement("optime");
- if( e.eoo() ) {
- log() << " pull: failed to get cur optime from master" << endl;
- log() << " " << o.toString() << endl;
- return;
- }
- uassert( e.type() == Date );
- OpTime serverCurTime;
- serverCurTime.asDate() = e.date();
-
- pullOpLog(conn);
-}
-
-/* -- Logging of operations -------------------------------------*/
-
-// cached copies of these...
-NamespaceDetails *localOplogMainDetails = 0;
-Client *localOplogClient = 0;
-
-/* we write to local.opload.$main:
- { ts : ..., op: ..., ns: ..., o: ... }
- ts: an OpTime timestamp
- op:
- 'i' = insert
-*/
-void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb) {
- if( strncmp(ns, "local.", 6) == 0 )
- return;
-
- Client *oldClient = client;
- if( localOplogMainDetails == 0 ) {
- setClientTempNs("local.");
- localOplogClient = client;
- localOplogMainDetails = nsdetails("local.oplog.$main");
- }
- client = localOplogClient;
-
- /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
- instead we do a single copy to the destination position in the memory mapped file.
- */
-
- JSObjBuilder b;
- b.appendDate("ts", OpTime::now().asDate());
- b.append("op", opstr);
- b.append("ns", ns);
- if( bb )
- b.appendBool("b", *bb);
- if( o2 )
- b.append("o2", *o2);
- JSObj partial = b.done();
- int posz = partial.objsize();
- int len = posz + obj.objsize() + 1 + 2 /*o:*/;
-
- Record *r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, "local.oplog.$main", len);
-
- char *p = r->data;
- memcpy(p, partial.objdata(), posz);
- *((unsigned *)p) += obj.objsize() + 1 + 2;
- p += posz - 1;
- *p++ = (char) Object;
- *p++ = 'o';
- *p++ = 0;
- memcpy(p, obj.objdata(), obj.objsize());
- p += obj.objsize();
- *p = EOO;
-
- client = oldClient;
-}
-
-/* --------------------------------------------------------------*/
-
-void replMain() {
- vector<Source*> sources;
-
- {
- lock lk(dbMutex);
- Source::loadAll(sources);
- }
-
- if( sources.empty() )
- sleepsecs(20);
-
- try {
- for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ )
- (*i)->sync();
- }
- catch( SyncException ) {
- sleepsecs(300);
- }
-
- Source::cleanup(sources);
-}
-
-int debug_stop_repl = 0;
-
-void replMainThread() {
- while( 1 ) {
- try {
- replMain();
- if( debug_stop_repl )
- break;
- sleepsecs(5);
- }
- catch( AssertionException ) {
- problem() << "Assertion in replMainThread(): sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
- }
- }
-}
-
-void startReplication() {
-#if defined(_WIN32)
- slave=true;
-#endif
- if( slave ) {
- log() << "slave=true" << endl;
- boost::thread repl_thread(replMainThread);
- }
-
- if( master ) {
- log() << "master=true" << endl;
- lock lk(dbMutex);
- /* create an oplog collection, if it doesn't yet exist. */
- JSObjBuilder b;
- b.append("size", 254.0 * 1000 * 1000);
- b.appendBool("capped", 1);
- setClientTempNs("local.oplog.$main");
- string err;
- JSObj o = b.done();
- userCreateNS("local.oplog.$main", o, err);
- client = 0;
- }
-}
+ JSObj o = conn.findOne("admin.$cmd", opTimeQuery); + Element e = o.findElement("optime"); + if( e.eoo() ) { + log() << "pull: failed to get cur optime from master" << endl; + log() << " " << o.toString() << endl; + return; + } + uassert( e.type() == Date ); + OpTime serverCurTime; + serverCurTime.asDate() = e.date(); + + pullOpLog(conn); +} + +/* -- Logging of operations -------------------------------------*/ + +// cached copies of these... +NamespaceDetails *localOplogMainDetails = 0; +Client *localOplogClient = 0; + +/* we write to local.opload.$main: + { ts : ..., op: ..., ns: ..., o: ... } + ts: an OpTime timestamp + op: + 'i' = insert +*/ +void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb) { + if( strncmp(ns, "local.", 6) == 0 ) + return; + + Client *oldClient = client; + if( localOplogMainDetails == 0 ) { + setClientTempNs("local."); + localOplogClient = client; + localOplogMainDetails = nsdetails("local.oplog.$main"); + } + client = localOplogClient; + + /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- + instead we do a single copy to the destination position in the memory mapped file. + */ + + JSObjBuilder b; + b.appendDate("ts", OpTime::now().asDate()); + b.append("op", opstr); + b.append("ns", ns); + if( bb ) + b.appendBool("b", *bb); + if( o2 ) + b.append("o2", *o2); + JSObj partial = b.done(); + int posz = partial.objsize(); + int len = posz + obj.objsize() + 1 + 2 /*o:*/; + + Record *r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, "local.oplog.$main", len); + + char *p = r->data; + memcpy(p, partial.objdata(), posz); + *((unsigned *)p) += obj.objsize() + 1 + 2; + p += posz - 1; + *p++ = (char) Object; + *p++ = 'o'; + *p++ = 0; + memcpy(p, obj.objdata(), obj.objsize()); + p += obj.objsize(); + *p = EOO; + + client = oldClient; +} + +/* --------------------------------------------------------------*/ + +void replMain() { + vector<Source*> sources; + + { + dblock lk; + Source::loadAll(sources); + } + + if( sources.empty() ) + sleepsecs(20); + + try { + for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ ) + (*i)->sync(); + } + catch( SyncException ) { + sleepsecs(300); + } + + Source::cleanup(sources); +} + +int debug_stop_repl = 0; + +void replSlaveThread() { + sleepsecs(3); + while( 1 ) { + try { + replMain(); + if( debug_stop_repl ) + break; + sleepsecs(5); + } + catch( AssertionException ) { + problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl; + sleepsecs(300); + } + } +} + +void startReplication() { + if( slave ) { + log() << "slave=true" << endl; + boost::thread repl_thread(replSlaveThread); + } + + if( master ) { + log() << "master=true" << endl; + dblock lk; + /* create an oplog collection, if it doesn't yet exist. */ + JSObjBuilder b; + b.append("size", 254.0 * 1000 * 1000); + b.appendBool("capped", 1); + setClientTempNs("local.oplog.$main"); + string err; + JSObj o = b.done(); + userCreateNS("local.oplog.$main", o, err); + client = 0; + } +} diff --git a/grid/message.cpp b/grid/message.cpp index e0cdea59eba..205ec318a3a 100644 --- a/grid/message.cpp +++ b/grid/message.cpp @@ -55,11 +55,11 @@ void Listener::listen() { while( 1 ) { int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); if( s < 0 ) { - cout << "Listener: accept() returns " << s << " errno:" << errno << endl; + log() << "Listener: accept() returns " << s << " errno:" << errno << endl; continue; } disableNagle(s); - cout << "Listener: connection accepted from " << from.toString() << endl; + log() << "connection accepted from " << from.toString() << endl; accepted( new MessagingPort(s, from) ); } } @@ -112,7 +112,7 @@ bool MessagingPort::connect(SockAddr& _far) return false; } if( ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize) ) { - cout << "ERROR: connect(): connect() failed " << errno << ' ' << farEnd.getPort() << endl; + log() << "connect(): failed errno:" << errno << ' ' << farEnd.getPort() << endl; closesocket(sock); sock = -1; return false; } diff --git a/util/goodies.h b/util/goodies.h index 282f96694d0..631ed12b9f6 100644 --- a/util/goodies.h +++ b/util/goodies.h @@ -189,15 +189,5 @@ public: }; */ -/* -struct lock { -boostlock bl; - DebugMutex& m; - lock(DebugMutex& _m) : m(_m) { - do_lock(); - } - ~lock() { do_unlock(); } -} -*/ -typedef boostlock lock; +//typedef boostlock lock; diff --git a/util/sock.h b/util/sock.h index 1e101b619ac..1840c3fc10f 100644 --- a/util/sock.h +++ b/util/sock.h @@ -58,11 +58,11 @@ inline void disableNagle(int sock) { #endif if( setsockopt(sock, level, TCP_NODELAY, (char *) &x, sizeof(x)) ) - cout << "ERROR: disableNagle failed" << endl; + log() << "ERROR: disableNagle failed" << endl; } inline void prebindOptions( int sock ){ - cout << "doing prebind option" << endl; + log() << "doing prebind option" << endl; int x = 1; if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &x, sizeof(x)) < 0 ) cout << "Failed to set socket opt, SO_REUSEADDR" << endl; @@ -71,12 +71,12 @@ inline void prebindOptions( int sock ){ #endif -// .empty() if err
-inline string hostbyname_nonreentrant(const char *hostname) {
- struct hostent *h;
- h = gethostbyname(hostname);
- if( h == 0 ) return "";
- return inet_ntoa( *((struct in_addr *)(h->h_addr)) );
+// .empty() if err +inline string hostbyname_nonreentrant(const char *hostname) { + struct hostent *h; + h = gethostbyname(hostname); + if( h == 0 ) return ""; + return inet_ntoa( *((struct in_addr *)(h->h_addr)) ); } struct SockAddr { |