diff options
author | Dwight <dmerriman@gmail.com> | 2008-06-10 13:13:57 -0400 |
---|---|---|
committer | Dwight <dmerriman@gmail.com> | 2008-06-10 13:13:57 -0400 |
commit | 94cb05e36a01ef7317795c8f06a378e75005ef3f (patch) | |
tree | efdd7d91ceb9278c7d26e9ec4b8b637df5d2635f | |
parent | 485fb3efc4c0bcf32b8231dbc6aba15dfe3e1366 (diff) | |
parent | 1aeaa4f34d9778b391f287542c0acb40daa21456 (diff) | |
download | mongo-94cb05e36a01ef7317795c8f06a378e75005ef3f.tar.gz |
Merge branch 'dwight'
-rw-r--r-- | db/clientcursor.cpp | 263 | ||||
-rw-r--r-- | db/clientcursor.h | 68 | ||||
-rw-r--r-- | db/db.cpp | 4 | ||||
-rw-r--r-- | db/db.vcproj | 4 | ||||
-rw-r--r-- | db/query.cpp | 7 | ||||
-rw-r--r-- | db/query.h | 58 | ||||
-rw-r--r-- | grid/message.cpp | 13 | ||||
-rw-r--r-- | grid/message.h | 1 | ||||
-rw-r--r-- | stdafx.h | 11 | ||||
-rw-r--r-- | util/goodies.h | 14 |
10 files changed, 209 insertions, 234 deletions
diff --git a/db/clientcursor.cpp b/db/clientcursor.cpp index d90762b90c5..2f409597eb4 100644 --- a/db/clientcursor.cpp +++ b/db/clientcursor.cpp @@ -1,9 +1,9 @@ -// clientcursor.cpp - -/* Cursor -- and its derived classes -- are our internal cursors. +/* clientcursor.cpp ClientCursor is a wrapper that represents a cursorid from our client application's perspective. + + Cursor -- and its derived classes -- are our internal cursors. */ #include "stdafx.h" @@ -13,58 +13,29 @@ /* TODO: FIX cleanup of clientCursors when hit the end. (ntoreturn insufficient) */ -typedef map<DiskLoc, ClientCursor*> DiskToCC; -map<DiskLoc, ClientCursor*> byLocation; -//HashTable<DiskLoc,ClientCursor*> byLocation(malloc(10000000), 10000000, "bylocation"); +CCById clientCursorsById; -CCMap clientCursors; +/* ------------------------------------------- */ -class CursInspector : public SingleResultObjCursor { - Cursor* clone() { - return new CursInspector(); - } -// Cursor* clone() { return new CursInspector(*this); } - void fill() { - b.append("byLocation_size", byLocation.size()); - b.append("clientCursors_size", clientCursors.size()); +typedef multimap<DiskLoc, ClientCursor*> ByLoc; +ByLoc byLoc; +void ClientCursor::setLastLoc(DiskLoc L) { + if( L == _lastLoc ) + return; + if( !_lastLoc.isNull() ) { + ByLoc::iterator i = kv_find(byLoc, _lastLoc, this); + if( i != byLoc.end() ) + byLoc.erase(i); + } - stringstream ss; - ss << '\n'; - int x = 40; - DiskToCC::iterator it = byLocation.begin(); - while( it != byLocation.end() ) { - DiskLoc dl = it->first; - ClientCursor *cc = it->second; - ss << dl.toString() << " -> \n"; + if( !L.isNull() ) + byLoc.insert( make_pair(L, this) ); + _lastLoc = L; +} - while( cc ) { - ss << " cid:" << cc->cursorid << ' ' << cc->ns << " pos:" << cc->pos << " LL:" << cc->lastLoc.toString(); - try { - setClient(cc->ns.c_str()); - Record *r = dl.rec(); - ss << " lwh:" << hex << r->lengthWithHeaders << " nxt:" << r->nextOfs << " prv:" << r->prevOfs << dec << ' ' << cc->c->toString(); - if( r->nextOfs >= 0 && r->nextOfs < 16 ) - ss << " DELETED??? (!)"; - } - catch(...) { - ss << " EXCEPTION"; - } - ss << "\n"; - cc = cc->nextAtThisLocation; - } - if( --x <= 0 ) { - ss << "only first 40 shown\n" << endl; - break; - } - it++; - } - b.append("dump", ss.str().c_str()); - } -public: - CursInspector() { reg("intr.cursors"); } -} _ciproto; +/* ------------------------------------------- */ /* must call this when a btree node is updated */ void removedKey(const DiskLoc& btreeLoc, int keyPos) { @@ -73,142 +44,104 @@ void removedKey(const DiskLoc& btreeLoc, int keyPos) { /* must call this on a delete so we clean up the cursors. */ void aboutToDelete(const DiskLoc& dl) { - DiskToCC::iterator it = byLocation.find(dl); -// cout << "atd:" << dl.toString() << endl; - if( it != byLocation.end() ) { - ClientCursor *cc = it->second; - byLocation.erase(it); - - assert( cc != 0 ); - int z = 0; - while( cc ) { - z++; -// cout << "cc: " << cc->ns << endl; - ClientCursor *nxt = cc->nextAtThisLocation; - cc->nextAtThisLocation = 0; // updateLocation will manipulate linked list ptrs, so clean that up first. - cc->c->checkLocation(); - cc->c->advance(); - cc->lastLoc.Null(); // so updateLocation doesn't try to remove, just to be faster -- we handled that. - cc->updateLocation(); - cc = nxt; - } -// cout << "z:" << z << endl; - } -} - -void ClientCursor::cleanupByLocation(DiskLoc loc, long long cursorid) { - if( loc.isNull() ) - return; - - DiskToCC::iterator it = byLocation.find(loc); - if( it != byLocation.end() ) { - ClientCursor *first = it->second; - ClientCursor *cc = first; - ClientCursor *prev = 0; + vector<ClientCursor*> toAdvance; - while( 1 ) { - if( cc == 0 ) - break; - if( cc->cursorid == cursorid ) { - // found one to remove. - if( prev == 0 ) { - if( cc->nextAtThisLocation ) - it->second = cc->nextAtThisLocation; - else - byLocation.erase(it); - } - else { - prev->nextAtThisLocation = cc->nextAtThisLocation; - } - cc->nextAtThisLocation = 0; - return; - } - cc = cc->nextAtThisLocation; - } + for( ByLoc::iterator i = byLoc.lower_bound(dl); + i != byLoc.upper_bound(dl); ++i ) { + toAdvance.push_back(i->second); } - // not found! - //cout << "Assertion failure - cleanupByLocation: not found " << cursorid << endl; + for( vector<ClientCursor*>::iterator i = toAdvance.begin(); + i != toAdvance.end(); ++i ) + { + (*i)->c->checkLocation(); + (*i)->c->advance(); + wassert( (*i)->c->currLoc() != dl ); + (*i)->updateLocation(); + } } ClientCursor::~ClientCursor() { -#if defined(_WIN32) - cout << "~clientcursor " << cursorid << endl; -#endif - assert( pos != -2 ); - - cleanupByLocation(lastLoc, cursorid); - + DEV cout << "~clientcursor " << cursorid << endl; assert( pos != -2 ); - - // defensive - lastLoc.Null(); - cursorid = -1; + setLastLoc( DiskLoc() ); // removes us from bylocation multimap + clientCursorsById.erase(cursorid); + // defensive: + (CursorId&) cursorid = -1; pos = -2; - nextAtThisLocation = 0; -} - -// note this doesn't set lastLoc -- caller should. -void ClientCursor::addToByLocation(DiskLoc cl) { - assert( cursorid ); - - if( nextAtThisLocation ) { - wassert( nextAtThisLocation == 0 ); - return; - } - - DiskToCC::iterator j = byLocation.find(cl); - nextAtThisLocation = j == byLocation.end() ? 0 : j->second; - byLocation[cl] = this; } +/* call when cursor's location changes so that we can update the + cursorsbylocation map. if you are locked and internally iterating, only + need to call when you are ready to "unlock". +*/ void ClientCursor::updateLocation() { assert( cursorid ); - DiskLoc cl = c->currLoc(); - // cout<< " TEMP: updateLocation last:" << lastLoc.toString() << " cl:" << cl.toString() << '\n'; - - if( !lastLoc.isNull() ) - cleanupByLocation(lastLoc, cursorid); - - if( !cl.isNull() ) - addToByLocation(cl); - - lastLoc = cl; - c->noteLocation(); -} - -/* report to us that a new clientcursor exists so we can track it. - note you still must call updateLocation (which likely should be changed) -*/ -void ClientCursor::add(ClientCursor* cc) { - clientCursors[cc->cursorid] = cc; -} - -// todo: delete the ClientCursor. -// todo: other map -bool ClientCursor::erase(long long id) { - CCMap::iterator it = clientCursors.find(id); - if( it != clientCursors.end() ) { - ClientCursor *cc = it->second; - it->second = 0; // defensive - clientCursors.erase(it); - delete cc; // destructor will fix byLocation map - return true; + if( lastLoc() == cl ) { + cout << "info: lastloc==curloc " << ns << '\n'; + return; } - return false; + setLastLoc(cl); + c->noteLocation(); } -long long allocCursorId() { +int ctmLast = 0; // so we don't have to do find() which is a little slow very often. +long long ClientCursor::allocCursorId() { long long x; + int ctm = (int) curTimeMillis(); while( 1 ) { x = (((long long)rand()) << 32); - x = x | (int) curTimeMillis() | 0x80000000; // OR to make sure not zero - if( clientCursors.count(x) == 0 ) + x = x | ctm | 0x80000000; // OR to make sure not zero + if( ctm != ctmLast || ClientCursor::find(x) == 0 ) break; } -#if defined(_WIN32) - cout << "alloccursorid " << x << endl; -#endif + ctmLast = ctm; + DEV cout << "alloccursorid " << x << endl; return x; } + +class CursInspector : public SingleResultObjCursor { + Cursor* clone() { + return new CursInspector(); + } + void fill() { + b.append("byLocation_size", byLoc.size()); + b.append("clientCursors_size", clientCursorsById.size()); +/* todo update for new impl: + stringstream ss; + ss << '\n'; + int x = 40; + DiskToCC::iterator it = clientCursorsByLocation.begin(); + while( it != clientCursorsByLocation.end() ) { + DiskLoc dl = it->first; + ss << dl.toString() << " -> \n"; + set<ClientCursor*>::iterator j = it->second.begin(); + while( j != it->second.end() ) { + ss << " cid:" << j->second->cursorid << ' ' << j->second->ns << " pos:" << j->second->pos << " LL:" << j->second->lastLoc.toString(); + try { + setClient(j->second->ns.c_str()); + Record *r = dl.rec(); + ss << " lwh:" << hex << r->lengthWithHeaders << " nxt:" << r->nextOfs << " prv:" << r->prevOfs << dec << ' ' << j->second->c->toString(); + if( r->nextOfs >= 0 && r->nextOfs < 16 ) + ss << " DELETED??? (!)"; + } + catch(...) { + ss << " EXCEPTION"; + } + ss << "\n"; + j++; + } + if( --x <= 0 ) { + ss << "only first 40 shown\n" << endl; + break; + } + it++; + } + b.append("dump", ss.str().c_str()); +*/ + } +public: + CursInspector() { reg("intr.cursors"); } +} _ciproto; + diff --git a/db/clientcursor.h b/db/clientcursor.h new file mode 100644 index 00000000000..6fd5bb12155 --- /dev/null +++ b/db/clientcursor.h @@ -0,0 +1,68 @@ +/* clientcursor.h + + Cursor -- and its derived classes -- are our internal cursors. + + ClientCursor is a wrapper that represents a cursorid from our client + application's perspective. +*/ + +#pragma once + +#include "../stdafx.h" + +typedef long long CursorId; +class Cursor; +class ClientCursor; +typedef map<CursorId, ClientCursor*> CCById; +extern CCById clientCursorsById; + + +class ClientCursor { + friend class CursInspector; + DiskLoc _lastLoc; // use getter and setter not this. + static CursorId allocCursorId(); +public: + ClientCursor() : cursorid( allocCursorId() ), pos(0) { + clientCursorsById.insert( make_pair(cursorid, this) ); + } + ~ClientCursor(); + const CursorId cursorid; + string ns; + auto_ptr<JSMatcher> matcher; + auto_ptr<Cursor> c; + int pos; + DiskLoc lastLoc() const { return _lastLoc; } + void setLastLoc(DiskLoc); + auto_ptr< set<string> > filter; // which fields query wants returned + + static bool erase(CursorId id) { + ClientCursor *cc = find(id); + if( cc ) { + delete cc; + return true; + } + return false; + } + + static ClientCursor* find(CursorId id) { + CCById::iterator it = clientCursorsById.find(id); + if( it == clientCursorsById.end() ) { + cout << "ClientCursor::find(): cursor not found in map " << id << '\n'; + return 0; + } + return it->second; + } + + /* call when cursor's location changes so that we can update the + cursorsbylocation map. if you are locked and internally iterating, only + need to call when you are ready to "unlock". + */ + void updateLocation(); + +//private: +// void addToByLocation(DiskLoc cl); + void cleanupByLocation(DiskLoc loc); +//public: +// ClientCursor *nextAtThisLocation; +}; + diff --git a/db/db.cpp b/db/db.cpp index 1d417c093f3..0598bee4351 100644 --- a/db/db.cpp +++ b/db/db.cpp @@ -274,7 +274,7 @@ public: }; void listen(int port) { - const char *Version = "db version: 112 6jun2008"; + const char *Version = "db version: 113 10jun2008"; problem() << Version << endl; cout << Version << endl; pdfileInit(); @@ -406,7 +406,7 @@ void connThread() stringstream ss; if( !dbMsgPort.recv(m) ) { - cout << "MessagingPort::recv(): returned false" << endl; + cout << "MessagingPort::recv(): returned false " << dbMsgPort.farEnd.toString() << endl; dbMsgPort.shutdown(); break; } diff --git a/db/db.vcproj b/db/db.vcproj index f375ef80324..a8c8d43b35b 100644 --- a/db/db.vcproj +++ b/db/db.vcproj @@ -249,6 +249,10 @@ >
</File>
<File
+ RelativePath=".\clientcursor.h"
+ >
+ </File>
+ <File
RelativePath=".\db.h"
>
</File>
diff --git a/db/query.cpp b/db/query.cpp index 2a624977630..ee1c4b70208 100644 --- a/db/query.cpp +++ b/db/query.cpp @@ -130,6 +130,7 @@ auto_ptr<Cursor> getIndexCursor(const char *ns, JSObj& query, JSObj& order) { } } JSObj q2 = b2.done(); + cout << "using index " << d->indexes[i].indexNamespace() << endl; return auto_ptr<Cursor>( new BtreeCursor(d->indexes[i].head, q2, 1, true)); } @@ -732,14 +733,12 @@ assert( debug.getN() < 5000 ); // more...so save a cursor ClientCursor *cc = new ClientCursor(); cc->c = c; - cursorid = allocCursorId(); - cc->cursorid = cursorid; + cursorid = cc->cursorid; cc->matcher = matcher; cc->ns = ns; cc->pos = n; - ClientCursor::add(cc); - cc->updateLocation(); cc->filter = filter; + cc->updateLocation(); } } break; diff --git a/db/query.h b/db/query.h index b0f7a503fcb..424c6aaa84b 100644 --- a/db/query.h +++ b/db/query.h @@ -75,61 +75,5 @@ QueryResult* runQuery(const char *ns, int ntoskip, int ntoreturn, void updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss); void deleteObjects(const char *ns, JSObj pattern, bool justOne); -class ClientCursor; -typedef map<long long, ClientCursor*> CCMap; -extern CCMap clientCursors; /* cursorid -> ClientCursor */ +#include "clientcursor.h" -/* Cursor -- and its derived classes -- are our internal cursors. - - ClientCursor is a wrapper that represents a cursorid from our client - application's perspective. -*/ -class Cursor; -class ClientCursor { - friend class CursInspector; -public: - ClientCursor() { - cursorid=0; pos=0; nextAtThisLocation=0; -#if defined(_WIN32) - cout << "clientcursor() " << cursorid << endl; -#endif - } - ~ClientCursor(); - long long cursorid; - string ns; - auto_ptr<JSMatcher> matcher; - auto_ptr<Cursor> c; - int pos; - DiskLoc lastLoc; - auto_ptr< set<string> > filter; - - /* report to us that a new clientcursor exists so we can track it. You still - do the initial updateLocation() yourself. - */ - static void add(ClientCursor*); - - static bool erase(long long cursorid); - - static ClientCursor* find(long long id) { - CCMap::iterator it = clientCursors.find(id); - if( it == clientCursors.end() ) { - cout << "ClientCursor::find(): cursor not found in map " << id << endl; - return 0; - } - return it->second; - } - - /* call when cursor's location changes so that we can update the - cursorsbylocation map. if you are locked and internally iterating, only - need to call when you are ready to "unlock". - */ - void updateLocation(); - -private: - void addToByLocation(DiskLoc cl); - static void cleanupByLocation(DiskLoc loc, long long cursorid); -public: - ClientCursor *nextAtThisLocation; -}; - -long long allocCursorId(); diff --git a/grid/message.cpp b/grid/message.cpp index caf2cd7e8a8..cfa9a02a912 100644 --- a/grid/message.cpp +++ b/grid/message.cpp @@ -121,16 +121,19 @@ bool MessagingPort::recv(Message& m) { // assert( x == 4 ); - assert( len >= 0 && len <= 16000000 ); + if( len < 0 || len > 16000000 ) { + cout << "bad recv() len: " << len << '\n'; + return false; + } int z = (len+1023)&0xfffffc00; assert(z>=len); MsgData *md = (MsgData *) malloc(z); md->len = len; - if ( len <= 0 ){ - cout << "got a length of 0, something is wrong" << endl; - return false; - } + if ( len <= 0 ){ + cout << "got a length of 0, something is wrong" << endl; + return false; + } char *p = (char *) &md->id; int left = len -4; diff --git a/grid/message.h b/grid/message.h index 604e9a67343..be2749c8bea 100644 --- a/grid/message.h +++ b/grid/message.h @@ -45,6 +45,7 @@ public: private: int sock; +public: SockAddr farEnd; }; @@ -99,12 +99,21 @@ inline ostream& problem() { time_t t; time(&t); string now(ctime(&t),0,20); - problems << "problem " << now; + problems << "~ " << now; if( client ) problems << curNs << ' '; return problems; } +/* for now, running on win32 means development not production -- + use this to log things just there. +*/ +#if !defined(_WIN32) +#define DEV if( 1 ) +#else +#define DEV if( 0 ) +#endif + #define DEBUGGING if( 0 ) extern unsigned occasion; diff --git a/util/goodies.h b/util/goodies.h index 94ca721aa8b..af6fd3f20b0 100644 --- a/util/goodies.h +++ b/util/goodies.h @@ -13,6 +13,20 @@ inline pthread_t GetCurrentThreadId() { return pthread_self(); } /* set to TRUE if we are exiting */ extern bool goingAway; +/* find the multimap member which matches a particular key and value. + + note this can be slow if there are a lot with the same key. +*/ +template<class C,class K,class V> inline typename C::iterator kv_find(C& c, const K& k,const V& v) { + pair<typename C::iterator,typename C::iterator> p = c.equal_range(k); +
+ for( typename C::iterator it=p.first; it!=p.second; ++it)
+ if( it->second == v )
+ return it;
+
+ return c.end();
+} + bool isPrime(int n); int nextPrime(int n); |