summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDwight <dmerriman@gmail.com>2008-08-11 15:16:54 -0400
committerDwight <dmerriman@gmail.com>2008-08-11 15:16:54 -0400
commit776387ea1bf641d43a22f17e2e23e7d0e8780f87 (patch)
tree466061cfc73afa1cc514fee7ca08903862719fa2
parent236575ca95fb7bddcd9ff142618fdd4db261530e (diff)
parent39030722e571658111c3583b61736d1e22524913 (diff)
downloadmongo-776387ea1bf641d43a22f17e2e23e7d0e8780f87.tar.gz
Merge branch 'master' of ssh://git.10gen.com/data/gitroot/p
Conflicts: db/repl.h
-rw-r--r--db/db.cpp5
-rw-r--r--db/db.h16
-rw-r--r--db/db.vcproj4
-rw-r--r--db/dbclient.cpp79
-rw-r--r--db/dbclient.h16
-rw-r--r--db/javajs.cpp29
-rw-r--r--db/jsobj.cpp18
-rw-r--r--db/jsobj.h16
-rw-r--r--db/pdfile.cpp8
-rw-r--r--db/pdfile.h22
-rw-r--r--db/query.h41
-rw-r--r--db/repl.cpp728
-rw-r--r--db/repl.h59
-rw-r--r--grid/message.cpp6
-rw-r--r--grid/message.h2
-rw-r--r--util/goodies.h12
-rw-r--r--util/mmap.cpp14
-rw-r--r--util/sock.h16
18 files changed, 591 insertions, 500 deletions
diff --git a/db/db.cpp b/db/db.cpp
index 25dc66b64bb..4f515545026 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -40,6 +40,7 @@ extern int curOp;
bool useCursors = true;
boost::mutex dbMutex;
+int dbLocked = 0;
void closeAllSockets();
void startReplication();
@@ -531,7 +532,7 @@ void connThread()
DbResponse dbresponse;
{
- lock lk(dbMutex);
+ dblock lk;
Timer t;
client = 0;
curOp = 0;
@@ -741,7 +742,7 @@ void mysighandler(int x) {
cout << "got kill or ctrl c signal " << x << ", will terminate after current cmd ends" << endl;
problem() << "got kill or ctrl c signal " << x << ", will terminate after current cmd ends" << endl;
{
- lock lk(dbMutex);
+ dblock lk;
problem() << " now exiting" << endl;
exit(12);
}
diff --git a/db/db.h b/db/db.h
index 6b179851953..023b789fde0 100644
--- a/db/db.h
+++ b/db/db.h
@@ -18,3 +18,19 @@
#include "../grid/message.h"
void jniCallback(Message& m, Message& out);
+
+extern boost::mutex dbMutex;
+extern int dbLocked;
+
+struct dblock {
+ boostlock bl;
+ dblock() : bl(dbMutex) {
+ dbLocked++;
+ assert( dbLocked == 1 );
+ }
+ ~dblock() {
+ dbLocked--;
+ assert( dbLocked == 0 );
+ }
+};
+
diff --git a/db/db.vcproj b/db/db.vcproj
index 9bcd685d7e1..710b14fa51e 100644
--- a/db/db.vcproj
+++ b/db/db.vcproj
@@ -43,7 +43,7 @@
<Tool
Name="VCCLCompilerTool"
Optimization="0"
- AdditionalIncludeDirectories="&quot;..\pcre-7.4&quot;;..\boost;&quot;C:\Program Files\Java\jdk1.6.0_05\include&quot;;&quot;C:\Program Files\Java\jdk1.6.0_05\include\win32&quot;"
+ AdditionalIncludeDirectories="&quot;..\pcre-7.4&quot;;..\boost;&quot;C:\Program Files\Java\jdk\include&quot;;&quot;C:\Program Files\Java\jdk\include\win32&quot;"
PreprocessorDefinitions="WIN32;_DEBUG;_CONSOLE;_SCL_SECURE_NO_DEPRECATE;BOOST_ALL_NO_LIB;BOOST_LIB_DIAGNOSTIC;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H"
MinimalRebuild="true"
BasicRuntimeChecks="3"
@@ -431,7 +431,7 @@
>
</File>
<File
- RelativePath="..\..\..\Program Files\Java\jdk1.6.0_05\lib\jvm.lib"
+ RelativePath="C:\Program Files\Java\jdk\lib\jvm.lib"
>
</File>
<File
diff --git a/db/dbclient.cpp b/db/dbclient.cpp
index 55da39a98a5..5a596c3dc4b 100644
--- a/db/dbclient.cpp
+++ b/db/dbclient.cpp
@@ -17,9 +17,11 @@
*/
#include "stdafx.h"
+#include "pdfile.h"
#include "dbclient.h"
#include "../util/builder.h"
#include "jsobj.h"
+#include "query.h"
JSObj DBClientConnection::findOne(const char *ns, JSObj query, JSObj *fieldsToReturn) {
auto_ptr<DBClientCursor> c =
@@ -34,41 +36,38 @@ JSObj DBClientConnection::findOne(const char *ns, JSObj query, JSObj *fieldsToRe
bool DBClientConnection::connect(const char *serverAddress, string& errmsg) {
/* not reentrant!
ok as used right now (we are in a big lock), but won't be later, so fix. */
+
+ int port = DBPort;
+
string ip = hostbyname_nonreentrant(serverAddress);
if( ip.empty() )
ip = serverAddress;
- server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), DBPort));
+ int idx = ip.find( ":" );
+ if ( idx != string::npos ){
+ cout << "port string:" << ip.substr( idx ) << endl;
+ port = atoi( ip.substr( idx + 1 ).c_str() );
+ ip = ip.substr( 0 , idx );
+ ip = hostbyname_nonreentrant(ip.c_str());
+
+ }
+ if( ip.empty() )
+ ip = serverAddress;
+
+ cout << "port:" << port << endl;
+ server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
if( !p.connect(*server) ) {
- errmsg = string("couldn't connect to server ") + serverAddress + ' ' + ip;
+ errmsg = string("couldn't connect to server ") + serverAddress + ' ' + ip;
return false;
}
return true;
}
-void DBClientCursor::requestMore() {
- assert( cursorId && pos == nReturned );
-
- BufBuilder b;
- b.append((int) 0); // reserved
- b.append(ns.c_str());
- b.append(nToReturn);
- b.append(cursorId);
-
- Message toSend;
- toSend.setData(dbGetMore, b.buf(), b.len());
- auto_ptr<Message> response(new Message());
- bool ok = p.call(toSend, *response);
- assert( ok );
-
- m = response;
- dataReceived();
-}
-
-auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, JSObj query, int nToReturn, int nToSkip, JSObj *fieldsToReturn) {
+auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, JSObj query, int nToReturn, int nToSkip, JSObj *fieldsToReturn, bool sticky) {
// see query.h for the protocol we are using here.
BufBuilder b;
- b.append((int) 0); // reserved
+ int opts = sticky ? Option_CursorSticky : 0;
+ b.append(opts);
b.append(ns);
b.append(nToSkip);
b.append(nToReturn);
@@ -82,16 +81,43 @@ auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, JSObj query,
if( !ok )
return auto_ptr<DBClientCursor>(0);
- auto_ptr<DBClientCursor> c(new DBClientCursor(p, response));
+ auto_ptr<DBClientCursor> c(new DBClientCursor(p, response, opts));
c->ns = ns;
c->nToReturn = nToReturn;
return c;
}
+/* -- DBClientCursor ---------------------------------------------- */
+
+void DBClientCursor::requestMore() {
+ assert( cursorId && pos == nReturned );
+
+ BufBuilder b;
+ b.append(opts);
+ b.append(ns.c_str());
+ b.append(nToReturn);
+ b.append(cursorId);
+
+ Message toSend;
+ toSend.setData(dbGetMore, b.buf(), b.len());
+ auto_ptr<Message> response(new Message());
+ bool ok = p.call(toSend, *response);
+ assert( ok );
+
+ m = response;
+ dataReceived();
+}
+
void DBClientCursor::dataReceived() {
QueryResult *qr = (QueryResult *) m->data;
- cursorId = qr->cursorId;
+ if( qr->resultOptions() & ResultOption_CursorNotFound )
+ dead = true;
+ if( cursorId == 0 ) {
+ // only set initially: we don't want to kill it on end of data
+ // if it's a sticky cursor
+ cursorId = qr->cursorId;
+ }
nReturned = qr->nReturned;
pos = 0;
data = qr->data();
@@ -105,7 +131,6 @@ bool DBClientCursor::more() {
if( cursorId == 0 )
return false;
-// cout << "TEMP: requestMore" << endl;
requestMore();
return pos < nReturned;
}
@@ -118,6 +143,8 @@ JSObj DBClientCursor::next() {
return o;
}
+/* ------------------------------------------------------ */
+
// "./db testclient" to invoke
extern JSObj emptyObj;
void testClient() {
diff --git a/db/dbclient.h b/db/dbclient.h
index 5c25b1f5bfd..63f1d4836cc 100644
--- a/db/dbclient.h
+++ b/db/dbclient.h
@@ -28,6 +28,9 @@ struct QueryResult : public MsgData {
int startingFrom;
int nReturned;
const char *data() { return (char *) (((int *)&nReturned)+1); }
+ int resultOptions() const {
+ return *((int *) _data);
+ }
};
#pragma pack(pop)
@@ -39,13 +42,19 @@ class DBClientCursor : boost::noncopyable {
int pos;
const char *data;
auto_ptr<Message> m;
+ int opts;
string ns;
int nToReturn;
+ bool dead;
void dataReceived();
void requestMore();
public:
- DBClientCursor(MessagingPort& _p, auto_ptr<Message> _m) :
- p(_p), m(_m) { dataReceived(); }
+ DBClientCursor(MessagingPort& _p, auto_ptr<Message> _m, int _opts) :
+ p(_p), m(_m), opts(_opts) {
+ dead = false;
+ cursorId = 0;
+ dataReceived();
+ }
bool more(); // if true, safe to call next()
JSObj next(); // returns next object in the result cursor
@@ -68,11 +77,12 @@ public:
nToSkip: start with the nth item
fieldsToReturn:
optional template of which fields to select. if unspecified, returns all fields
+ sticky: see query.h sticky comments
returns: cursor.
returns 0 if error
*/
- auto_ptr<DBClientCursor> query(const char *ns, JSObj query, int nToReturn = 0, int nToSkip = 0, JSObj *fieldsToReturn = 0);
+ auto_ptr<DBClientCursor> query(const char *ns, JSObj query, int nToReturn = 0, int nToSkip = 0, JSObj *fieldsToReturn = 0, bool sticky = false);
JSObj findOne(const char *ns, JSObj query, JSObj *fieldsToReturn = 0);
};
diff --git a/db/javajs.cpp b/db/javajs.cpp
index 34156707d9f..a5281c4d3ec 100644
--- a/db/javajs.cpp
+++ b/db/javajs.cpp
@@ -145,9 +145,9 @@ JavaJSImpl::JavaJSImpl(const char *appserverPath){
jint res = JNI_CreateJavaVM( &_jvm, (void**)&_mainEnv, _vmArgs );
if( res ) {
- cout << "using classpath: " << q << endl;
- cerr
- << "res : " << res << " "
+ log() << "using classpath: " << q << endl;
+ log()
+ << " res : " << (unsigned) res << " "
<< "_jvm : " << _jvm << " "
<< "_env : " << _mainEnv << " "
<< endl;
@@ -341,7 +341,7 @@ void JavaJSImpl::run( const char * js ){
jassert( m );
jstring s = _getEnv()->NewStringUTF( js );
- cout << _getEnv()->CallStaticObjectMethod( c , m , s ) << endl;
+ log() << _getEnv()->CallStaticObjectMethod( c , m , s ) << endl;
}
void JavaJSImpl::printException(){
@@ -365,7 +365,7 @@ JNIEnv * JavaJSImpl::_getEnv(){
}
void jasserted(const char *msg, const char *file, unsigned line) {
- cout << "jassert failed " << msg << " " << file << " " << line << endl;
+ log() << "jassert failed " << msg << " " << file << " " << line << endl;
if ( JavaJS ) JavaJS->printException();
throw AssertionException();
}
@@ -382,22 +382,22 @@ const char* findEd(const char *path) {
return findEd();
}
- cout << "Appserver location specified : " << path << endl;
+ log() << "Appserver location specified : " << path << endl;
if (!path) {
- cout << " invalid appserver location : " << path << " : terminating - prepare for bus error" << endl;
+ log() << " invalid appserver location : " << path << " : terminating - prepare for bus error" << endl;
return 0;
}
DIR *testDir = opendir(path);
if (testDir) {
- cout << " found directory for appserver : " << path << endl;
+ log() << " found directory for appserver : " << path << endl;
closedir(testDir);
return path;
}
else {
- cout << " ERROR : not a directory for specified appserver location : " << path << " - prepare for bus error" << endl;
+ log() << " ERROR : not a directory for specified appserver location : " << path << " - prepare for bus error" << endl;
return null;
}
#endif
@@ -405,7 +405,7 @@ const char* findEd(const char *path) {
const char * findEd(){
- cout << "Appserver location not specified. Searching.... " << endl;
+ log() << "Appserver location not specified. Searching.... " << endl;
#if defined(_WIN32)
log() << " WIN32 default : c:/l/ed/" << endl;
@@ -426,11 +426,12 @@ const char * findEd(){
continue;
closedir( test );
- cout << " found directory for appserver : " << temp << endl;
+ log() << " found directory for appserver : " << temp << endl;
return temp;
}
- cout << " ERROR : can't find directory for appserver - prepare for bus error" << endl;
+ problem() << "ERROR : can't find directory for appserver - terminating" << endl;
+ exit(44);
return 0;
#endif
};
@@ -472,7 +473,7 @@ int javajstest() {
JavaJSImpl& JavaJS = *::JavaJS;
- if ( debug ) cout << "about to create scope" << endl;
+ if ( debug ) log() << "about to create scope" << endl;
jlong scope = JavaJS.scopeCreate();
jassert( scope );
if ( debug ) cout << "got scope" << endl;
@@ -502,7 +503,7 @@ int javajstest() {
if ( debug ) cout << "going to get object" << endl;
JSObj obj = JavaJS.scopeGetObject( scope , "abc" );
- if ( debug ) cout << "done gettting object" << endl;
+ if ( debug ) cout << "done getting object" << endl;
if ( debug ){
cout << "obj : " << obj.toString() << endl;
diff --git a/db/jsobj.cpp b/db/jsobj.cpp
index 6d03a9577a3..7d3244ed330 100644
--- a/db/jsobj.cpp
+++ b/db/jsobj.cpp
@@ -145,6 +145,7 @@ string Element::toString() {
s << valuestr();
}
break;
+ case Symbol:
case String:
s << fieldName() << ": ";
if( valuestrsize() > 80 )
@@ -193,6 +194,7 @@ int Element::size() const {
case jstOID:
x = 13;
break;
+ case Symbol:
case Code:
case String:
x = valuestrsize() + 4 + 1;
@@ -263,6 +265,7 @@ int compareElementValues(const Element& l, const Element& r) {
case jstOID:
return memcmp(l.value(), r.value(), 12);
case Code:
+ case Symbol:
case String:
/* todo: utf version */
return strcmp(l.valuestr(), r.valuestr());
@@ -421,8 +424,14 @@ JSMatcher::JSMatcher(JSObj &_jsobj) :
assert( in == 0 ); // only one per query supported so far. finish...
in = new set<Element,element_lt>();
JSElemIter i(fe.embeddedObject());
- while( i.more() )
- in->insert(i.next());
+ if( i.more() ) {
+ while( 1 ) {
+ Element ie = i.next();
+ if( ie.eoo() )
+ break;
+ in->insert(ie);
+ }
+ }
toMatch.push_back(e); // not actually used at the moment
compareOp.push_back(opIN);
n++;
@@ -453,7 +462,8 @@ inline int JSMatcher::valuesMatch(Element& l, Element& r, int op) {
if( op == opIN ) {
// { $in : [1,2,3] }
- return in->count(l);
+ int c = in->count(l);
+ return c;
}
/* check LT, GTE, ... */
@@ -542,7 +552,7 @@ extern int dump;
inline bool _regexMatches(RegexMatcher& rm, Element& e) {
char buf[64];
const char *p = buf;
- if( e.type() == String )
+ if( e.type() == String || e.type() == Symbol )
p = e.valuestr();
else if( e.type() == Number ) {
sprintf(buf, "%f", e.number());
diff --git a/db/jsobj.h b/db/jsobj.h
index 9356d935ad3..b0a6653ddd1 100644
--- a/db/jsobj.h
+++ b/db/jsobj.h
@@ -36,7 +36,7 @@ class JSObjBuilder;
*/
enum JSType { EOO = 0, Number=1, String=2, Object=3, Array=4, BinData=5,
Undefined=6, jstOID=7, Bool=8, Date=9 , jstNULL=10, RegEx=11 ,
- DBRef=12, Code=13, JSTypeMax=13, MaxKey=127 };
+ DBRef=12, Code=13, Symbol=14, JSTypeMax=14, MaxKey=127 };
/* subtypes of BinData.
bdtCustom and above are ones that the JS compiler understands, but are
@@ -75,20 +75,6 @@ struct OID {
BinData: <int len> <byte subtype> <byte[len] data>
*/
-/* db operation message format
-
- unsigned opid; // arbitary; will be echoed back
- byte operation;
-
- dbInsert:
- int reserved;
- string collection;
- a series of JSObjects terminated with a null object (i.e., just EOO)
- dbUpdate: see query.h
- dbDelete: see query.h
- dbQuery: see query.h
-*/
-
#pragma pack(pop)
/* <type><fieldName ><value>
diff --git a/db/pdfile.cpp b/db/pdfile.cpp
index 288d9c50899..418af182552 100644
--- a/db/pdfile.cpp
+++ b/db/pdfile.cpp
@@ -411,7 +411,7 @@ bool userCreateNS(const char *ns, JSObj& j, string& err) {
return false;
}
- cout << "create collection " << ns << ' ' << j.toString() << endl;
+ log() << "create collection " << ns << ' ' << j.toString() << endl;
/* todo: do this only when we have allocated space successfully? or we could insert with a { ok: 0 } field
and then go back and set to ok : 1 after we are done.
@@ -1036,15 +1036,15 @@ DiskLoc DataFileMgr::insert(const char *ns, const void *buf, int len, bool god)
// try to create it
string err;
if( !userCreateNS(tabletoidxns, emptyObj, err) ) {
- cout << "ERROR: failed to create collection while adding its index. " << tabletoidxns << endl;
+ problem() << "ERROR: failed to create collection while adding its index. " << tabletoidxns << endl;
return DiskLoc();
}
tableToIndex = nsdetails(tabletoidxns);
- cout << "info: creating collection " << tabletoidxns << " on add index\n";
+ log() << "info: creating collection " << tabletoidxns << " on add index\n";
assert( tableToIndex );
}
if( tableToIndex->nIndexes >= MaxIndexes ) {
- cout << "user warning: bad add index attempt, too many indexes for:" << tabletoidxns << endl;
+ log() << "user warning: bad add index attempt, too many indexes for:" << tabletoidxns << endl;
return DiskLoc();
}
if( tableToIndex->findIndexByName(name) >= 0 ) {
diff --git a/db/pdfile.h b/db/pdfile.h
index 9447dabe08a..2450bb56a41 100644
--- a/db/pdfile.h
+++ b/db/pdfile.h
@@ -102,13 +102,13 @@ public:
/* Record is a record in a datafile. DeletedRecord is similar but for deleted space.
-*11:03:20 AM) dm10gen: regarding extentOfs...
-(11:03:42 AM) dm10gen: an extent is a continugous disk area, which contains many Records and DeleteRecords
-(11:03:56 AM) dm10gen: a DiskLoc has two pieces, the fileno and ofs. (64 bit total)
-(11:04:16 AM) dm10gen: to keep the headesr small, instead of storing a 64 bit ptr to the full extent address, we keep just the offset
-(11:04:29 AM) dm10gen: we can do this as we know the record's address, and it has the same fileNo
-(11:04:33 AM) dm10gen: see class DiskLoc for more info
-(11:04:43 AM) dm10gen: so that is how Record::myExtent() works
+*11:03:20 AM) dm10gen: regarding extentOfs...
+(11:03:42 AM) dm10gen: an extent is a continugous disk area, which contains many Records and DeleteRecords
+(11:03:56 AM) dm10gen: a DiskLoc has two pieces, the fileno and ofs. (64 bit total)
+(11:04:16 AM) dm10gen: to keep the headesr small, instead of storing a 64 bit ptr to the full extent address, we keep just the offset
+(11:04:29 AM) dm10gen: we can do this as we know the record's address, and it has the same fileNo
+(11:04:33 AM) dm10gen: see class DiskLoc for more info
+(11:04:43 AM) dm10gen: so that is how Record::myExtent() works
(11:04:53 AM) dm10gen: on an alloc(), when we build a new Record, we must popular its extentOfs then
*/
class Record {
@@ -136,7 +136,7 @@ public:
/* extents are datafile regions where all the records within the region
belong to the same namespace.
-(11:12:35 AM) dm10gen: when the extent is allocated, all its empty space is stuck into one big DeletedRecord
+(11:12:35 AM) dm10gen: when the extent is allocated, all its empty space is stuck into one big DeletedRecord
(11:12:55 AM) dm10gen: and that is placed on the free list
*/
class Extent {
@@ -445,7 +445,13 @@ public:
extern map<string,Client*> clients;
extern Client *client;
extern const char *curNs;
+extern int dbLocked;
inline void setClient(const char *ns) {
+ /* we must be in critical section at this point as these are global
+ variables.
+ */
+ assert( dbLocked == 1 );
+
char cl[256];
curNs = ns;
nsToClient(ns, cl);
diff --git a/db/query.h b/db/query.h
index 0751e945532..64acc41ffbb 100644
--- a/db/query.h
+++ b/db/query.h
@@ -23,34 +23,38 @@
#include "jsobj.h"
#include "storage.h"
-/* requests:
+/* db request message format
- dbDelete
- int reserved=0;
+ unsigned opid; // arbitary; will be echoed back
+ byte operation;
+ int options;
+
+ then for:
+
+ dbInsert:
+ string collection;
+ a series of JSObjects terminated with a null object (i.e., just EOO)
+ dbDelete:
string collection;
int flags=0; // 1=DeleteSingle
JSObject query;
dbUpdate:
- int reserved;
string collection;
int flags; // 1=upsert
JSObject query;
JSObject objectToUpdate;
objectToUpdate may include { $inc: <field> }.
dbQuery:
- int reserved;
string collection;
int nToSkip;
int nToReturn; // how many you want back as the beginning of the cursor data
JSObject query;
[JSObject fieldsToReturn]
dbGetMore:
- int reserved;
string collection; // redundant, might use for security.
int nToReturn;
int64 cursorID;
dbKillCursors=2007:
- int reserved;
int n;
int64 cursorIDs[n];
@@ -59,16 +63,35 @@
Note that the update field layout is very similar layout to Query.
*/
+/* the field 'options' above can have these bits set: */
+enum {
+ /* Sticky means cursor is not closed when the last data is retrieved. rather, the cursor "sticks"
+ on the final object's position. you can resume using the cursor later, from where it was located,
+ if more data were received. Set on dbQuery and dbGetMore.
+
+ like any "latent cursor", the cursor may become invalid at some point -- for example if that
+ final object it references were deleted. Thus, you should be prepared to requery if you get back
+ ResultOption_CursorNotFound.
+ */
+ Option_CursorSticky = 2
+};
+
/* db response format
- Query or GetMore:
- int reserved;
+ Query or GetMore: // see struct QueryResult
+ int resultOptions = 0;
int64 cursorID;
int startingFrom;
int nReturned; // 0=infinity
list of marshalled JSObjects;
*/
+/* the field 'resultOptions' above */
+enum {
+ /* returned, with zero results, when getMore is called but the cursor id is not valid at the server. */
+ ResultOption_CursorNotFound = 1
+};
+
// grab struct QueryResult from:
#include "dbclient.h"
diff --git a/db/repl.cpp b/db/repl.cpp
index 6ee1b8a7027..c7a59489ad6 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -25,13 +25,14 @@
#include "pdfile.h"
#include "query.h"
#include "json.h"
+#include "db.h"
extern JSObj emptyObj;
extern boost::mutex dbMutex;
auto_ptr<Cursor> findTableScan(const char *ns, JSObj& order);
bool userCreateNS(const char *ns, JSObj& j, string& err);
-int _updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss);
-bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuilder &b, JSObjBuilder& anObjBuilder);
+int _updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss);
+bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuilder &b, JSObjBuilder& anObjBuilder);
OpTime last(0, 0);
@@ -60,7 +61,6 @@ struct TestOpTime {
} testoptime;
int test2() {
- TestOpTime t;
return 0;
}
@@ -88,9 +88,10 @@ void Cloner::copy(const char *collection) {
}
}
+extern int port;
bool Cloner::go(const char *masterHost, string& errmsg) {
- if( string("localhost") == masterHost || string("127.0.0.1") == masterHost ) {
- errmsg = "can't clone from self";
+ if( (string("localhost") == masterHost || string("127.0.0.1") == masterHost) && port == DBPort ) {
+ errmsg = "can't clone from self (localhost). sources configuration may be wrong.";
return false;
}
if( !conn.connect(masterHost, errmsg) )
@@ -127,361 +128,372 @@ bool Cloner::go(const char *masterHost, string& errmsg) {
return true;
}
-bool cloneFrom(const char *masterHost, string& errmsg)
-{
- Cloner c;
- return c.go(masterHost, errmsg);
-}
-
-/* --------------------------------------------------------------*/
-
-Source::Source(JSObj o) {
- hostName = o.getStringField("host");
- sourceName = o.getStringField("source");
- uassert( !hostName.empty() );
- uassert( !sourceName.empty() );
- Element e = o.getField("syncedTo");
- if( !e.eoo() ) {
- uassert( e.type() == Date );
- syncedTo.asDate() = e.date();
- }
-
- JSObj dbsObj = o.getObjectField("dbs");
- if( !dbsObj.isEmpty() ) {
- JSElemIter i(dbsObj);
- while( 1 ) {
- Element e = i.next();
- if( e.eoo() )
- break;
- dbs.insert( e.fieldName() );
- }
- }
-}
-
-/* Turn our C++ Source object into a JSObj */
-JSObj Source::jsobj() {
- JSObjBuilder b;
- b.append("host", hostName);
- b.append("source", sourceName);
- b.appendDate("syncedTo", syncedTo.asDate());
-
- JSObjBuilder dbs_builder;
- for( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
- dbs_builder.appendBool(i->c_str(), 1);
- }
- b.append("dbs", dbs_builder.done());
-
- return b.doneAndDecouple();
-}
-
-void Source::save() {
- JSObjBuilder b;
- b.append("host", hostName);
- b.append("source", sourceName);
- JSObj pattern = b.done();
-
- JSObj o = jsobj();
-
- stringstream ss;
- setClient("local.sources");
- //cout << o.toString() << endl;
- //cout << pattern.toString() << endl;
- int u = _updateObjects("local.sources", o, pattern, false, ss);
- assert( u == 1 );
- client = 0;
-}
-
-void Source::cleanup(vector<Source*>& v) {
- for( vector<Source*>::iterator i = v.begin(); i != v.end(); i++ )
- delete *i;
-}
-
-void Source::loadAll(vector<Source*>& v) {
- setClient("local.sources");
- auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
- while( c->ok() ) {
- v.push_back( new Source(c->current()) );
- c->advance();
- }
- client = 0;
-}
-
-JSObj opTimeQuery = fromjson("{getoptime:1}");
-
-bool Source::resync(string db) {
- {
- log() << "resync: dropping database " << db << endl;
- string dummyns = db + ".";
- assert( client->name == db );
- dropDatabase(dummyns.c_str());
- setClientTempNs(dummyns.c_str());
- }
-
- {
- log() << "resync: cloning database " << db << endl;
- Cloner c;
- string errmsg;
- bool ok = c.go(hostName.c_str(), errmsg);
- if( !ok ) {
- problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
- throw SyncException();
- }
- }
-
- log() << "resync: done " << db << endl;
- dbs.insert(db);
- return true;
-}
-
-/* { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
- You must lock dbMutex before calling.
-*/
-void Source::applyOperation(JSObj& op) {
- stringstream ss;
- const char *ns = op.getStringField("ns");
- setClientTempNs(ns);
-
- if( client->justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */
- !dbs.count(client->name) ) /* if not in dbs, we've never synced this database before, so we need everything */
- {
- resync(client->name);
- client->justCreated = false;
- }
-
- const char *opType = op.getStringField("op");
- JSObj o = op.getObjectField("o");
- if( *opType == 'i' ) {
- // do upserts for inserts as we might get replayed more than once
- OID *oid = o.getOID();
- if( oid == 0 ) {
- _updateObjects(ns, o, o, true, ss);
- }
- else {
- JSObjBuilder b;
- b.appendOID("_id", oid);
- _updateObjects(ns, o, b.done(), true, ss);
- }
- // theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
- }
- else if( *opType == 'u' ) {
- _updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss);
- }
- else if( *opType == 'd' ) {
- deleteObjects(ns, o, op.getBoolField("b"));
- }
- else {
- BufBuilder bb;
- JSObjBuilder ob;
- assert( *opType == 'c' );
- _runCommands(ns, o, ss, bb, ob);
- }
- client = 0;
-}
-
-/* note: not yet in mutex at this point. */
-void Source::pullOpLog(DBClientConnection& conn) {
- JSObjBuilder q;
- q.appendDate("$gte", syncedTo.asDate());
- JSObjBuilder query;
- query.append("ts", q.done());
- // query = { ts: { $gte: syncedTo } }
-
- string ns = string("local.oplog.$") + sourceName;
- auto_ptr<DBClientCursor> c =
- conn.query(ns.c_str(), query.done());
- if( !c->more() ) {
- problem() << "pull: " << ns << " empty?\n";
- sleepsecs(3);
- return;
- }
-
- JSObj j = c->next();
- Element ts = j.findElement("ts");
- assert( ts.type() == Date );
- OpTime t;
- t.asDate() = ts.date();
- bool initial = syncedTo.isNull();
- if( initial ) {
- log() << "pull: initial run\n";
- }
- else if( t != syncedTo ) {
- log() << "pull: t " << t.toString() << " != syncedTo " << syncedTo.toString() << '\n';
- log() << " data too stale, halting replication" << endl;
- assert( syncedTo < t );
- throw SyncException();
- }
-
- // apply operations
- int n = 0;
- {
- lock lk(dbMutex);
- while( 1 ) {
- if( !c->more() ) {
- log() << "pull: applied " << n << " operations" << endl;
- syncedTo = t;
- save(); // note how far we are synced up to now
- break;
- }
- /* todo: get out of the mutex for the next()? */
- JSObj op = c->next();
- ts = op.findElement("ts");
- assert( ts.type() == Date );
- OpTime last = t;
- t.asDate() = ts.date();
- if( !( last < t ) ) {
- problem() << "sync error: last " << last.toString() << " >= t " << t.toString() << endl;
- uassert(false);
- }
-
- applyOperation(op);
- n++;
- }
- }
-}
-
-/* note: not yet in mutex at this point. */
-void Source::sync() {
- log() << "pull: from " << sourceName << '@' << hostName << endl;
-
- DBClientConnection conn;
- string errmsg;
+bool cloneFrom(const char *masterHost, string& errmsg)
+{
+ Cloner c;
+ return c.go(masterHost, errmsg);
+}
+
+/* --------------------------------------------------------------*/
+
+Source::Source(JSObj o) {
+ hostName = o.getStringField("host");
+ sourceName = o.getStringField("source");
+ uassert( !hostName.empty() );
+ uassert( !sourceName.empty() );
+ Element e = o.getField("syncedTo");
+ if( !e.eoo() ) {
+ uassert( e.type() == Date );
+ syncedTo.asDate() = e.date();
+ }
+
+ JSObj dbsObj = o.getObjectField("dbs");
+ if( !dbsObj.isEmpty() ) {
+ JSElemIter i(dbsObj);
+ while( 1 ) {
+ Element e = i.next();
+ if( e.eoo() )
+ break;
+ dbs.insert( e.fieldName() );
+ }
+ }
+}
+
+/* Turn our C++ Source object into a JSObj */
+JSObj Source::jsobj() {
+ JSObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName);
+ b.appendDate("syncedTo", syncedTo.asDate());
+
+ JSObjBuilder dbs_builder;
+ for( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
+ dbs_builder.appendBool(i->c_str(), 1);
+ }
+ b.append("dbs", dbs_builder.done());
+
+ return b.doneAndDecouple();
+}
+
+void Source::save() {
+ JSObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName);
+ JSObj pattern = b.done();
+
+ JSObj o = jsobj();
+
+ stringstream ss;
+ setClient("local.sources");
+ //cout << o.toString() << endl;
+ //cout << pattern.toString() << endl;
+ int u = _updateObjects("local.sources", o, pattern, false, ss);
+ assert( u == 1 );
+ client = 0;
+}
+
+void Source::cleanup(vector<Source*>& v) {
+ for( vector<Source*>::iterator i = v.begin(); i != v.end(); i++ )
+ delete *i;
+}
+
+void Source::loadAll(vector<Source*>& v) {
+ setClient("local.sources");
+ auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
+ while( c->ok() ) {
+ v.push_back( new Source(c->current()) );
+ c->advance();
+ }
+ client = 0;
+}
+
+JSObj opTimeQuery = fromjson("{getoptime:1}");
+
+bool Source::resync(string db) {
+ {
+ log() << "resync: dropping database " << db << endl;
+ string dummyns = db + ".";
+ assert( client->name == db );
+ dropDatabase(dummyns.c_str());
+ setClientTempNs(dummyns.c_str());
+ }
+
+ {
+ log() << "resync: cloning database " << db << endl;
+ Cloner c;
+ string errmsg;
+ bool ok = c.go(hostName.c_str(), errmsg);
+ if( !ok ) {
+ problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
+ throw SyncException();
+ }
+ }
+
+ log() << "resync: done " << db << endl;
+ dbs.insert(db);
+ return true;
+}
+
+/* { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
+ You must lock dbMutex before calling.
+*/
+void Source::applyOperation(JSObj& op) {
+ stringstream ss;
+ const char *ns = op.getStringField("ns");
+ setClientTempNs(ns);
+
+ if( client->justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */
+ !dbs.count(client->name) ) /* if not in dbs, we've never synced this database before, so we need everything */
+ {
+ resync(client->name);
+ client->justCreated = false;
+ }
+
+ const char *opType = op.getStringField("op");
+ JSObj o = op.getObjectField("o");
+ if( *opType == 'i' ) {
+ // do upserts for inserts as we might get replayed more than once
+ OID *oid = o.getOID();
+ if( oid == 0 ) {
+ _updateObjects(ns, o, o, true, ss);
+ }
+ else {
+ JSObjBuilder b;
+ b.appendOID("_id", oid);
+ _updateObjects(ns, o, b.done(), true, ss);
+ }
+ // theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
+ }
+ else if( *opType == 'u' ) {
+ _updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss);
+ }
+ else if( *opType == 'd' ) {
+ deleteObjects(ns, o, op.getBoolField("b"));
+ }
+ else {
+ BufBuilder bb;
+ JSObjBuilder ob;
+ assert( *opType == 'c' );
+ _runCommands(ns, o, ss, bb, ob);
+ }
+ client = 0;
+}
+
+/* note: not yet in mutex at this point. */
+void Source::pullOpLog(DBClientConnection& conn) {
+ JSObjBuilder q;
+ q.appendDate("$gte", syncedTo.asDate());
+ JSObjBuilder query;
+ query.append("ts", q.done());
+ // query = { ts: { $gte: syncedTo } }
+
+ string ns = string("local.oplog.$") + sourceName;
+ auto_ptr<DBClientCursor> c =
+ conn.query(ns.c_str(), query.done());
+ if( !c->more() ) {
+ problem() << "pull: " << ns << " empty?\n";
+ sleepsecs(3);
+ return;
+ }
+
+ int n = 0;
+ JSObj op = c->next();
+ Element ts = op.findElement("ts");
+ assert( ts.type() == Date );
+ OpTime t;
+ t.asDate() = ts.date();
+ bool initial = syncedTo.isNull();
+ if( initial ) {
+ log() << "pull: initial run\n";
+ {
+ dblock lk;
+ applyOperation(op);
+ n++;
+ }
+ }
+ else if( t != syncedTo ) {
+ log() << "pull: t " << t.toString() << " != syncedTo " << syncedTo.toString() << '\n';
+ log() << "pull: data too stale, halting replication" << endl;
+ assert( syncedTo < t );
+ throw SyncException();
+ }
+ else {
+ /* t == syncedTo, so the first op was applied previously, no need to redo it. */
+ }
+
+ // apply operations
+ {
+ dblock lk;
+ while( 1 ) {
+ if( !c->more() ) {
+ log() << "pull: applied " << n << " operations" << endl;
+ syncedTo = t;
+ save(); // note how far we are synced up to now
+ break;
+ }
+ /* todo: get out of the mutex for the next()? */
+ JSObj op = c->next();
+ ts = op.findElement("ts");
+ assert( ts.type() == Date );
+ OpTime last = t;
+ t.asDate() = ts.date();
+ if( !( last < t ) ) {
+ problem() << "sync error: last " << last.toString() << " >= t " << t.toString() << endl;
+ uassert(false);
+ }
+
+ applyOperation(op);
+ n++;
+ }
+ }
+}
+
+/* note: not yet in mutex at this point. */
+void Source::sync() {
+ log() << "pull: from " << sourceName << '@' << hostName << endl;
+
+ if( (string("localhost") == hostName || string("127.0.0.1") == hostName) && port == DBPort ) {
+ log() << "pull: can't sync from self (localhost). sources configuration may be wrong." << endl;
+ return;
+ }
+
+ DBClientConnection conn;
+ string errmsg;
if( !conn.connect(hostName.c_str(), errmsg) ) {
- log() << " pull: cantconn " << errmsg << endl;
+ log() << "pull: cantconn " << errmsg << endl;
return;
}
// get current mtime at the server.
- JSObj o = conn.findOne("admin.$cmd", opTimeQuery);
- Element e = o.findElement("optime");
- if( e.eoo() ) {
- log() << " pull: failed to get cur optime from master" << endl;
- log() << " " << o.toString() << endl;
- return;
- }
- uassert( e.type() == Date );
- OpTime serverCurTime;
- serverCurTime.asDate() = e.date();
-
- pullOpLog(conn);
-}
-
-/* -- Logging of operations -------------------------------------*/
-
-// cached copies of these...
-NamespaceDetails *localOplogMainDetails = 0;
-Client *localOplogClient = 0;
-
-/* we write to local.opload.$main:
- { ts : ..., op: ..., ns: ..., o: ... }
- ts: an OpTime timestamp
- op:
- 'i' = insert
-*/
-void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb) {
- if( strncmp(ns, "local.", 6) == 0 )
- return;
-
- Client *oldClient = client;
- if( localOplogMainDetails == 0 ) {
- setClientTempNs("local.");
- localOplogClient = client;
- localOplogMainDetails = nsdetails("local.oplog.$main");
- }
- client = localOplogClient;
-
- /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
- instead we do a single copy to the destination position in the memory mapped file.
- */
-
- JSObjBuilder b;
- b.appendDate("ts", OpTime::now().asDate());
- b.append("op", opstr);
- b.append("ns", ns);
- if( bb )
- b.appendBool("b", *bb);
- if( o2 )
- b.append("o2", *o2);
- JSObj partial = b.done();
- int posz = partial.objsize();
- int len = posz + obj.objsize() + 1 + 2 /*o:*/;
-
- Record *r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, "local.oplog.$main", len);
-
- char *p = r->data;
- memcpy(p, partial.objdata(), posz);
- *((unsigned *)p) += obj.objsize() + 1 + 2;
- p += posz - 1;
- *p++ = (char) Object;
- *p++ = 'o';
- *p++ = 0;
- memcpy(p, obj.objdata(), obj.objsize());
- p += obj.objsize();
- *p = EOO;
-
- client = oldClient;
-}
-
-/* --------------------------------------------------------------*/
-
-void replMain() {
- vector<Source*> sources;
-
- {
- lock lk(dbMutex);
- Source::loadAll(sources);
- }
-
- if( sources.empty() )
- sleepsecs(20);
-
- try {
- for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ )
- (*i)->sync();
- }
- catch( SyncException ) {
- sleepsecs(300);
- }
-
- Source::cleanup(sources);
-}
-
-int debug_stop_repl = 0;
-
-void replMainThread() {
- while( 1 ) {
- try {
- replMain();
- if( debug_stop_repl )
- break;
- sleepsecs(5);
- }
- catch( AssertionException ) {
- problem() << "Assertion in replMainThread(): sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
- }
- }
-}
-
-void startReplication() {
-#if defined(_WIN32)
- slave=true;
-#endif
- if( slave ) {
- log() << "slave=true" << endl;
- boost::thread repl_thread(replMainThread);
- }
-
- if( master ) {
- log() << "master=true" << endl;
- lock lk(dbMutex);
- /* create an oplog collection, if it doesn't yet exist. */
- JSObjBuilder b;
- b.append("size", 254.0 * 1000 * 1000);
- b.appendBool("capped", 1);
- setClientTempNs("local.oplog.$main");
- string err;
- JSObj o = b.done();
- userCreateNS("local.oplog.$main", o, err);
- client = 0;
- }
-}
+ JSObj o = conn.findOne("admin.$cmd", opTimeQuery);
+ Element e = o.findElement("optime");
+ if( e.eoo() ) {
+ log() << "pull: failed to get cur optime from master" << endl;
+ log() << " " << o.toString() << endl;
+ return;
+ }
+ uassert( e.type() == Date );
+ OpTime serverCurTime;
+ serverCurTime.asDate() = e.date();
+
+ pullOpLog(conn);
+}
+
+/* -- Logging of operations -------------------------------------*/
+
+// cached copies of these...
+NamespaceDetails *localOplogMainDetails = 0;
+Client *localOplogClient = 0;
+
+/* we write to local.opload.$main:
+ { ts : ..., op: ..., ns: ..., o: ... }
+ ts: an OpTime timestamp
+ op:
+ 'i' = insert
+*/
+void _logOp(const char *opstr, const char *ns, JSObj& obj, JSObj *o2, bool *bb) {
+ if( strncmp(ns, "local.", 6) == 0 )
+ return;
+
+ Client *oldClient = client;
+ if( localOplogMainDetails == 0 ) {
+ setClientTempNs("local.");
+ localOplogClient = client;
+ localOplogMainDetails = nsdetails("local.oplog.$main");
+ }
+ client = localOplogClient;
+
+ /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
+ instead we do a single copy to the destination position in the memory mapped file.
+ */
+
+ JSObjBuilder b;
+ b.appendDate("ts", OpTime::now().asDate());
+ b.append("op", opstr);
+ b.append("ns", ns);
+ if( bb )
+ b.appendBool("b", *bb);
+ if( o2 )
+ b.append("o2", *o2);
+ JSObj partial = b.done();
+ int posz = partial.objsize();
+ int len = posz + obj.objsize() + 1 + 2 /*o:*/;
+
+ Record *r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, "local.oplog.$main", len);
+
+ char *p = r->data;
+ memcpy(p, partial.objdata(), posz);
+ *((unsigned *)p) += obj.objsize() + 1 + 2;
+ p += posz - 1;
+ *p++ = (char) Object;
+ *p++ = 'o';
+ *p++ = 0;
+ memcpy(p, obj.objdata(), obj.objsize());
+ p += obj.objsize();
+ *p = EOO;
+
+ client = oldClient;
+}
+
+/* --------------------------------------------------------------*/
+
+void replMain() {
+ vector<Source*> sources;
+
+ {
+ dblock lk;
+ Source::loadAll(sources);
+ }
+
+ if( sources.empty() )
+ sleepsecs(20);
+
+ try {
+ for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ )
+ (*i)->sync();
+ }
+ catch( SyncException ) {
+ sleepsecs(300);
+ }
+
+ Source::cleanup(sources);
+}
+
+int debug_stop_repl = 0;
+
+void replSlaveThread() {
+ sleepsecs(3);
+ while( 1 ) {
+ try {
+ replMain();
+ if( debug_stop_repl )
+ break;
+ sleepsecs(5);
+ }
+ catch( AssertionException ) {
+ problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ }
+ }
+}
+
+void startReplication() {
+ if( slave ) {
+ log() << "slave=true" << endl;
+ boost::thread repl_thread(replSlaveThread);
+ }
+
+ if( master ) {
+ log() << "master=true" << endl;
+ dblock lk;
+ /* create an oplog collection, if it doesn't yet exist. */
+ JSObjBuilder b;
+ b.append("size", 254.0 * 1000 * 1000);
+ b.appendBool("capped", 1);
+ setClientTempNs("local.oplog.$main");
+ string err;
+ JSObj o = b.done();
+ userCreateNS("local.oplog.$main", o, err);
+ client = 0;
+ }
+}
diff --git a/db/repl.h b/db/repl.h
index 237ab5695b1..75f41773234 100644
--- a/db/repl.h
+++ b/db/repl.h
@@ -35,32 +35,41 @@ extern bool master;
bool cloneFrom(const char *masterHost, string& errmsg);
#pragma pack(push,4)
-class OpTime {
- unsigned i;
- unsigned secs;
-public:
- OpTime(unsigned a, unsigned b) { secs = a; i = b; }
- OpTime() { secs = 0; i = 0; }
- static OpTime now();
- unsigned long long& asDate() { return *((unsigned long long *) this); }
- bool isNull() { return secs == 0; }
- string toString() {
- stringstream ss;
- ss << hex << secs << ':' << i;
- return ss.str();
- }
- bool operator==(const OpTime& r) const {
- return i == r.i && secs == r.secs;
- }
- bool operator!=(const OpTime& r) const { return !(*this == r); }
- bool operator<(const OpTime& r) const {
- if( secs != r.secs )
- return secs < r.secs;
- return i < r.i;
- }
-};
-#pragma pack(pop)
+class OpTime {
+ unsigned i;
+ unsigned secs;
+public:
+ OpTime(unsigned a, unsigned b) { secs = a; i = b; }
+ OpTime() { secs = 0; i = 0; }
+ static OpTime now();
+
+ /* We store OpTime's in the database as Javascript Date datatype -- we needed some sort of
+ 64 bit "container" for these values. While these are not really "Dates", that seems a
+ better choice for now than say, Number, which is floating point. Note the BinData type
+ is perhaps the cleanest choice, lacking a true unsigned64 datatype, but BinData has a
+ couple bytes of overhead.
+ */
+ unsigned long long& asDate() { return *((unsigned long long *) this); }
+
+ bool isNull() { return secs == 0; }
+ string toString() const {
+ stringstream ss;
+ ss << hex << secs << ':' << i;
+ return ss.str();
+ }
+ bool operator==(const OpTime& r) const {
+ return i == r.i && secs == r.secs;
+ }
+ bool operator!=(const OpTime& r) const { return !(*this == r); }
+ bool operator<(const OpTime& r) const {
+ if( secs != r.secs )
+ return secs < r.secs;
+ return i < r.i;
+ }
+};
+#pragma pack(pop)
+/* A replication exception */
struct SyncException {
};
diff --git a/grid/message.cpp b/grid/message.cpp
index e0cdea59eba..205ec318a3a 100644
--- a/grid/message.cpp
+++ b/grid/message.cpp
@@ -55,11 +55,11 @@ void Listener::listen() {
while( 1 ) {
int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
if( s < 0 ) {
- cout << "Listener: accept() returns " << s << " errno:" << errno << endl;
+ log() << "Listener: accept() returns " << s << " errno:" << errno << endl;
continue;
}
disableNagle(s);
- cout << "Listener: connection accepted from " << from.toString() << endl;
+ log() << "connection accepted from " << from.toString() << endl;
accepted( new MessagingPort(s, from) );
}
}
@@ -112,7 +112,7 @@ bool MessagingPort::connect(SockAddr& _far)
return false;
}
if( ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize) ) {
- cout << "ERROR: connect(): connect() failed " << errno << ' ' << farEnd.getPort() << endl;
+ log() << "connect(): failed errno:" << errno << ' ' << farEnd.getPort() << endl;
closesocket(sock); sock = -1;
return false;
}
diff --git a/grid/message.h b/grid/message.h
index 9bf49e1674b..7a3235ae265 100644
--- a/grid/message.h
+++ b/grid/message.h
@@ -89,7 +89,7 @@ struct MsgData {
MSGID id; /* request/reply id's match... */
int responseTo; /* id of the message we are responding to */
int operation;
- char _data[4];
+ char _data[4];
int dataLen(); // len without header
};
diff --git a/util/goodies.h b/util/goodies.h
index 282f96694d0..631ed12b9f6 100644
--- a/util/goodies.h
+++ b/util/goodies.h
@@ -189,15 +189,5 @@ public:
};
*/
-/*
-struct lock {
-boostlock bl;
- DebugMutex& m;
- lock(DebugMutex& _m) : m(_m) {
- do_lock();
- }
- ~lock() { do_unlock(); }
-}
-*/
-typedef boostlock lock;
+//typedef boostlock lock;
diff --git a/util/mmap.cpp b/util/mmap.cpp
index 94f96d6fa59..a3296cd2412 100644
--- a/util/mmap.cpp
+++ b/util/mmap.cpp
@@ -148,14 +148,14 @@ void* MemoryMappedFile::map(const char *filename, int length) {
/* make sure the file is the full desired length */
off_t filelen = lseek(fd, 0, SEEK_END);
if( filelen < length ) {
- cout << "map: file length=" << (unsigned) filelen << " want:";
- cout << length;
- cout << endl;
+ log() << "map: file length=" << (unsigned) filelen << " want:"
+ << length
+ << endl;
if( filelen != 0 ) {
- cout << " failing mapping" << endl;
+ log() << " failing mapping" << endl;
return 0;
}
- cout << " writing file to full length with zeroes..." << endl;
+ log() << " writing file to full length with zeroes..." << endl;
int z = 8192;
char buf[z];
memset(buf, 0, z);
@@ -168,7 +168,7 @@ void* MemoryMappedFile::map(const char *filename, int length) {
write(fd, buf, z);
left -= z;
}
- cout << " done" << endl;
+ log() << " done" << endl;
}
lseek(fd, length, SEEK_SET);
@@ -184,7 +184,7 @@ void* MemoryMappedFile::map(const char *filename, int length) {
void MemoryMappedFile::flush(bool sync) {
if( msync(view, len, sync ? MS_SYNC : MS_ASYNC) )
- cout << "msync error " << errno << endl;
+ problem() << "msync error " << errno << endl;
}
#endif
diff --git a/util/sock.h b/util/sock.h
index 1e101b619ac..58913027fcd 100644
--- a/util/sock.h
+++ b/util/sock.h
@@ -58,11 +58,11 @@ inline void disableNagle(int sock) {
#endif
if( setsockopt(sock, level, TCP_NODELAY, (char *) &x, sizeof(x)) )
- cout << "ERROR: disableNagle failed" << endl;
+ log() << "ERROR: disableNagle failed" << endl;
}
inline void prebindOptions( int sock ){
- cout << "doing prebind option" << endl;
+ DEV log() << "doing prebind option" << endl;
int x = 1;
if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &x, sizeof(x)) < 0 )
cout << "Failed to set socket opt, SO_REUSEADDR" << endl;
@@ -71,12 +71,12 @@ inline void prebindOptions( int sock ){
#endif
-// .empty() if err
-inline string hostbyname_nonreentrant(const char *hostname) {
- struct hostent *h;
- h = gethostbyname(hostname);
- if( h == 0 ) return "";
- return inet_ntoa( *((struct in_addr *)(h->h_addr)) );
+// .empty() if err
+inline string hostbyname_nonreentrant(const char *hostname) {
+ struct hostent *h;
+ h = gethostbyname(hostname);
+ if( h == 0 ) return "";
+ return inet_ntoa( *((struct in_addr *)(h->h_addr)) );
}
struct SockAddr {