summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDwight <dmerriman@gmail.com>2008-06-10 13:13:57 -0400
committerDwight <dmerriman@gmail.com>2008-06-10 13:13:57 -0400
commit94cb05e36a01ef7317795c8f06a378e75005ef3f (patch)
treeefdd7d91ceb9278c7d26e9ec4b8b637df5d2635f
parent485fb3efc4c0bcf32b8231dbc6aba15dfe3e1366 (diff)
parent1aeaa4f34d9778b391f287542c0acb40daa21456 (diff)
downloadmongo-94cb05e36a01ef7317795c8f06a378e75005ef3f.tar.gz
Merge branch 'dwight'
-rw-r--r--db/clientcursor.cpp263
-rw-r--r--db/clientcursor.h68
-rw-r--r--db/db.cpp4
-rw-r--r--db/db.vcproj4
-rw-r--r--db/query.cpp7
-rw-r--r--db/query.h58
-rw-r--r--grid/message.cpp13
-rw-r--r--grid/message.h1
-rw-r--r--stdafx.h11
-rw-r--r--util/goodies.h14
10 files changed, 209 insertions, 234 deletions
diff --git a/db/clientcursor.cpp b/db/clientcursor.cpp
index d90762b90c5..2f409597eb4 100644
--- a/db/clientcursor.cpp
+++ b/db/clientcursor.cpp
@@ -1,9 +1,9 @@
-// clientcursor.cpp
-
-/* Cursor -- and its derived classes -- are our internal cursors.
+/* clientcursor.cpp
ClientCursor is a wrapper that represents a cursorid from our client
application's perspective.
+
+ Cursor -- and its derived classes -- are our internal cursors.
*/
#include "stdafx.h"
@@ -13,58 +13,29 @@
/* TODO: FIX cleanup of clientCursors when hit the end. (ntoreturn insufficient) */
-typedef map<DiskLoc, ClientCursor*> DiskToCC;
-map<DiskLoc, ClientCursor*> byLocation;
-//HashTable<DiskLoc,ClientCursor*> byLocation(malloc(10000000), 10000000, "bylocation");
+CCById clientCursorsById;
-CCMap clientCursors;
+/* ------------------------------------------- */
-class CursInspector : public SingleResultObjCursor {
- Cursor* clone() {
- return new CursInspector();
- }
-// Cursor* clone() { return new CursInspector(*this); }
- void fill() {
- b.append("byLocation_size", byLocation.size());
- b.append("clientCursors_size", clientCursors.size());
+typedef multimap<DiskLoc, ClientCursor*> ByLoc;
+ByLoc byLoc;
+void ClientCursor::setLastLoc(DiskLoc L) {
+ if( L == _lastLoc )
+ return;
+ if( !_lastLoc.isNull() ) {
+ ByLoc::iterator i = kv_find(byLoc, _lastLoc, this);
+ if( i != byLoc.end() )
+ byLoc.erase(i);
+ }
- stringstream ss;
- ss << '\n';
- int x = 40;
- DiskToCC::iterator it = byLocation.begin();
- while( it != byLocation.end() ) {
- DiskLoc dl = it->first;
- ClientCursor *cc = it->second;
- ss << dl.toString() << " -> \n";
+ if( !L.isNull() )
+ byLoc.insert( make_pair(L, this) );
+ _lastLoc = L;
+}
- while( cc ) {
- ss << " cid:" << cc->cursorid << ' ' << cc->ns << " pos:" << cc->pos << " LL:" << cc->lastLoc.toString();
- try {
- setClient(cc->ns.c_str());
- Record *r = dl.rec();
- ss << " lwh:" << hex << r->lengthWithHeaders << " nxt:" << r->nextOfs << " prv:" << r->prevOfs << dec << ' ' << cc->c->toString();
- if( r->nextOfs >= 0 && r->nextOfs < 16 )
- ss << " DELETED??? (!)";
- }
- catch(...) {
- ss << " EXCEPTION";
- }
- ss << "\n";
- cc = cc->nextAtThisLocation;
- }
- if( --x <= 0 ) {
- ss << "only first 40 shown\n" << endl;
- break;
- }
- it++;
- }
- b.append("dump", ss.str().c_str());
- }
-public:
- CursInspector() { reg("intr.cursors"); }
-} _ciproto;
+/* ------------------------------------------- */
/* must call this when a btree node is updated */
void removedKey(const DiskLoc& btreeLoc, int keyPos) {
@@ -73,142 +44,104 @@ void removedKey(const DiskLoc& btreeLoc, int keyPos) {
/* must call this on a delete so we clean up the cursors. */
void aboutToDelete(const DiskLoc& dl) {
- DiskToCC::iterator it = byLocation.find(dl);
-// cout << "atd:" << dl.toString() << endl;
- if( it != byLocation.end() ) {
- ClientCursor *cc = it->second;
- byLocation.erase(it);
-
- assert( cc != 0 );
- int z = 0;
- while( cc ) {
- z++;
-// cout << "cc: " << cc->ns << endl;
- ClientCursor *nxt = cc->nextAtThisLocation;
- cc->nextAtThisLocation = 0; // updateLocation will manipulate linked list ptrs, so clean that up first.
- cc->c->checkLocation();
- cc->c->advance();
- cc->lastLoc.Null(); // so updateLocation doesn't try to remove, just to be faster -- we handled that.
- cc->updateLocation();
- cc = nxt;
- }
-// cout << "z:" << z << endl;
- }
-}
-
-void ClientCursor::cleanupByLocation(DiskLoc loc, long long cursorid) {
- if( loc.isNull() )
- return;
-
- DiskToCC::iterator it = byLocation.find(loc);
- if( it != byLocation.end() ) {
- ClientCursor *first = it->second;
- ClientCursor *cc = first;
- ClientCursor *prev = 0;
+ vector<ClientCursor*> toAdvance;
- while( 1 ) {
- if( cc == 0 )
- break;
- if( cc->cursorid == cursorid ) {
- // found one to remove.
- if( prev == 0 ) {
- if( cc->nextAtThisLocation )
- it->second = cc->nextAtThisLocation;
- else
- byLocation.erase(it);
- }
- else {
- prev->nextAtThisLocation = cc->nextAtThisLocation;
- }
- cc->nextAtThisLocation = 0;
- return;
- }
- cc = cc->nextAtThisLocation;
- }
+ for( ByLoc::iterator i = byLoc.lower_bound(dl);
+ i != byLoc.upper_bound(dl); ++i ) {
+ toAdvance.push_back(i->second);
}
- // not found!
- //cout << "Assertion failure - cleanupByLocation: not found " << cursorid << endl;
+ for( vector<ClientCursor*>::iterator i = toAdvance.begin();
+ i != toAdvance.end(); ++i )
+ {
+ (*i)->c->checkLocation();
+ (*i)->c->advance();
+ wassert( (*i)->c->currLoc() != dl );
+ (*i)->updateLocation();
+ }
}
ClientCursor::~ClientCursor() {
-#if defined(_WIN32)
- cout << "~clientcursor " << cursorid << endl;
-#endif
- assert( pos != -2 );
-
- cleanupByLocation(lastLoc, cursorid);
-
+ DEV cout << "~clientcursor " << cursorid << endl;
assert( pos != -2 );
-
- // defensive
- lastLoc.Null();
- cursorid = -1;
+ setLastLoc( DiskLoc() ); // removes us from bylocation multimap
+ clientCursorsById.erase(cursorid);
+ // defensive:
+ (CursorId&) cursorid = -1;
pos = -2;
- nextAtThisLocation = 0;
-}
-
-// note this doesn't set lastLoc -- caller should.
-void ClientCursor::addToByLocation(DiskLoc cl) {
- assert( cursorid );
-
- if( nextAtThisLocation ) {
- wassert( nextAtThisLocation == 0 );
- return;
- }
-
- DiskToCC::iterator j = byLocation.find(cl);
- nextAtThisLocation = j == byLocation.end() ? 0 : j->second;
- byLocation[cl] = this;
}
+/* call when cursor's location changes so that we can update the
+ cursorsbylocation map. if you are locked and internally iterating, only
+ need to call when you are ready to "unlock".
+*/
void ClientCursor::updateLocation() {
assert( cursorid );
-
DiskLoc cl = c->currLoc();
- // cout<< " TEMP: updateLocation last:" << lastLoc.toString() << " cl:" << cl.toString() << '\n';
-
- if( !lastLoc.isNull() )
- cleanupByLocation(lastLoc, cursorid);
-
- if( !cl.isNull() )
- addToByLocation(cl);
-
- lastLoc = cl;
- c->noteLocation();
-}
-
-/* report to us that a new clientcursor exists so we can track it.
- note you still must call updateLocation (which likely should be changed)
-*/
-void ClientCursor::add(ClientCursor* cc) {
- clientCursors[cc->cursorid] = cc;
-}
-
-// todo: delete the ClientCursor.
-// todo: other map
-bool ClientCursor::erase(long long id) {
- CCMap::iterator it = clientCursors.find(id);
- if( it != clientCursors.end() ) {
- ClientCursor *cc = it->second;
- it->second = 0; // defensive
- clientCursors.erase(it);
- delete cc; // destructor will fix byLocation map
- return true;
+ if( lastLoc() == cl ) {
+ cout << "info: lastloc==curloc " << ns << '\n';
+ return;
}
- return false;
+ setLastLoc(cl);
+ c->noteLocation();
}
-long long allocCursorId() {
+int ctmLast = 0; // so we don't have to do find() which is a little slow very often.
+long long ClientCursor::allocCursorId() {
long long x;
+ int ctm = (int) curTimeMillis();
while( 1 ) {
x = (((long long)rand()) << 32);
- x = x | (int) curTimeMillis() | 0x80000000; // OR to make sure not zero
- if( clientCursors.count(x) == 0 )
+ x = x | ctm | 0x80000000; // OR to make sure not zero
+ if( ctm != ctmLast || ClientCursor::find(x) == 0 )
break;
}
-#if defined(_WIN32)
- cout << "alloccursorid " << x << endl;
-#endif
+ ctmLast = ctm;
+ DEV cout << "alloccursorid " << x << endl;
return x;
}
+
+class CursInspector : public SingleResultObjCursor {
+ Cursor* clone() {
+ return new CursInspector();
+ }
+ void fill() {
+ b.append("byLocation_size", byLoc.size());
+ b.append("clientCursors_size", clientCursorsById.size());
+/* todo update for new impl:
+ stringstream ss;
+ ss << '\n';
+ int x = 40;
+ DiskToCC::iterator it = clientCursorsByLocation.begin();
+ while( it != clientCursorsByLocation.end() ) {
+ DiskLoc dl = it->first;
+ ss << dl.toString() << " -> \n";
+ set<ClientCursor*>::iterator j = it->second.begin();
+ while( j != it->second.end() ) {
+ ss << " cid:" << j->second->cursorid << ' ' << j->second->ns << " pos:" << j->second->pos << " LL:" << j->second->lastLoc.toString();
+ try {
+ setClient(j->second->ns.c_str());
+ Record *r = dl.rec();
+ ss << " lwh:" << hex << r->lengthWithHeaders << " nxt:" << r->nextOfs << " prv:" << r->prevOfs << dec << ' ' << j->second->c->toString();
+ if( r->nextOfs >= 0 && r->nextOfs < 16 )
+ ss << " DELETED??? (!)";
+ }
+ catch(...) {
+ ss << " EXCEPTION";
+ }
+ ss << "\n";
+ j++;
+ }
+ if( --x <= 0 ) {
+ ss << "only first 40 shown\n" << endl;
+ break;
+ }
+ it++;
+ }
+ b.append("dump", ss.str().c_str());
+*/
+ }
+public:
+ CursInspector() { reg("intr.cursors"); }
+} _ciproto;
+
diff --git a/db/clientcursor.h b/db/clientcursor.h
new file mode 100644
index 00000000000..6fd5bb12155
--- /dev/null
+++ b/db/clientcursor.h
@@ -0,0 +1,68 @@
+/* clientcursor.h
+
+ Cursor -- and its derived classes -- are our internal cursors.
+
+ ClientCursor is a wrapper that represents a cursorid from our client
+ application's perspective.
+*/
+
+#pragma once
+
+#include "../stdafx.h"
+
+typedef long long CursorId;
+class Cursor;
+class ClientCursor;
+typedef map<CursorId, ClientCursor*> CCById;
+extern CCById clientCursorsById;
+
+
+class ClientCursor {
+ friend class CursInspector;
+ DiskLoc _lastLoc; // use getter and setter not this.
+ static CursorId allocCursorId();
+public:
+ ClientCursor() : cursorid( allocCursorId() ), pos(0) {
+ clientCursorsById.insert( make_pair(cursorid, this) );
+ }
+ ~ClientCursor();
+ const CursorId cursorid;
+ string ns;
+ auto_ptr<JSMatcher> matcher;
+ auto_ptr<Cursor> c;
+ int pos;
+ DiskLoc lastLoc() const { return _lastLoc; }
+ void setLastLoc(DiskLoc);
+ auto_ptr< set<string> > filter; // which fields query wants returned
+
+ static bool erase(CursorId id) {
+ ClientCursor *cc = find(id);
+ if( cc ) {
+ delete cc;
+ return true;
+ }
+ return false;
+ }
+
+ static ClientCursor* find(CursorId id) {
+ CCById::iterator it = clientCursorsById.find(id);
+ if( it == clientCursorsById.end() ) {
+ cout << "ClientCursor::find(): cursor not found in map " << id << '\n';
+ return 0;
+ }
+ return it->second;
+ }
+
+ /* call when cursor's location changes so that we can update the
+ cursorsbylocation map. if you are locked and internally iterating, only
+ need to call when you are ready to "unlock".
+ */
+ void updateLocation();
+
+//private:
+// void addToByLocation(DiskLoc cl);
+ void cleanupByLocation(DiskLoc loc);
+//public:
+// ClientCursor *nextAtThisLocation;
+};
+
diff --git a/db/db.cpp b/db/db.cpp
index 1d417c093f3..0598bee4351 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -274,7 +274,7 @@ public:
};
void listen(int port) {
- const char *Version = "db version: 112 6jun2008";
+ const char *Version = "db version: 113 10jun2008";
problem() << Version << endl;
cout << Version << endl;
pdfileInit();
@@ -406,7 +406,7 @@ void connThread()
stringstream ss;
if( !dbMsgPort.recv(m) ) {
- cout << "MessagingPort::recv(): returned false" << endl;
+ cout << "MessagingPort::recv(): returned false " << dbMsgPort.farEnd.toString() << endl;
dbMsgPort.shutdown();
break;
}
diff --git a/db/db.vcproj b/db/db.vcproj
index f375ef80324..a8c8d43b35b 100644
--- a/db/db.vcproj
+++ b/db/db.vcproj
@@ -249,6 +249,10 @@
>
</File>
<File
+ RelativePath=".\clientcursor.h"
+ >
+ </File>
+ <File
RelativePath=".\db.h"
>
</File>
diff --git a/db/query.cpp b/db/query.cpp
index 2a624977630..ee1c4b70208 100644
--- a/db/query.cpp
+++ b/db/query.cpp
@@ -130,6 +130,7 @@ auto_ptr<Cursor> getIndexCursor(const char *ns, JSObj& query, JSObj& order) {
}
}
JSObj q2 = b2.done();
+ cout << "using index " << d->indexes[i].indexNamespace() << endl;
return auto_ptr<Cursor>(
new BtreeCursor(d->indexes[i].head, q2, 1, true));
}
@@ -732,14 +733,12 @@ assert( debug.getN() < 5000 );
// more...so save a cursor
ClientCursor *cc = new ClientCursor();
cc->c = c;
- cursorid = allocCursorId();
- cc->cursorid = cursorid;
+ cursorid = cc->cursorid;
cc->matcher = matcher;
cc->ns = ns;
cc->pos = n;
- ClientCursor::add(cc);
- cc->updateLocation();
cc->filter = filter;
+ cc->updateLocation();
}
}
break;
diff --git a/db/query.h b/db/query.h
index b0f7a503fcb..424c6aaa84b 100644
--- a/db/query.h
+++ b/db/query.h
@@ -75,61 +75,5 @@ QueryResult* runQuery(const char *ns, int ntoskip, int ntoreturn,
void updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss);
void deleteObjects(const char *ns, JSObj pattern, bool justOne);
-class ClientCursor;
-typedef map<long long, ClientCursor*> CCMap;
-extern CCMap clientCursors; /* cursorid -> ClientCursor */
+#include "clientcursor.h"
-/* Cursor -- and its derived classes -- are our internal cursors.
-
- ClientCursor is a wrapper that represents a cursorid from our client
- application's perspective.
-*/
-class Cursor;
-class ClientCursor {
- friend class CursInspector;
-public:
- ClientCursor() {
- cursorid=0; pos=0; nextAtThisLocation=0;
-#if defined(_WIN32)
- cout << "clientcursor() " << cursorid << endl;
-#endif
- }
- ~ClientCursor();
- long long cursorid;
- string ns;
- auto_ptr<JSMatcher> matcher;
- auto_ptr<Cursor> c;
- int pos;
- DiskLoc lastLoc;
- auto_ptr< set<string> > filter;
-
- /* report to us that a new clientcursor exists so we can track it. You still
- do the initial updateLocation() yourself.
- */
- static void add(ClientCursor*);
-
- static bool erase(long long cursorid);
-
- static ClientCursor* find(long long id) {
- CCMap::iterator it = clientCursors.find(id);
- if( it == clientCursors.end() ) {
- cout << "ClientCursor::find(): cursor not found in map " << id << endl;
- return 0;
- }
- return it->second;
- }
-
- /* call when cursor's location changes so that we can update the
- cursorsbylocation map. if you are locked and internally iterating, only
- need to call when you are ready to "unlock".
- */
- void updateLocation();
-
-private:
- void addToByLocation(DiskLoc cl);
- static void cleanupByLocation(DiskLoc loc, long long cursorid);
-public:
- ClientCursor *nextAtThisLocation;
-};
-
-long long allocCursorId();
diff --git a/grid/message.cpp b/grid/message.cpp
index caf2cd7e8a8..cfa9a02a912 100644
--- a/grid/message.cpp
+++ b/grid/message.cpp
@@ -121,16 +121,19 @@ bool MessagingPort::recv(Message& m) {
// assert( x == 4 );
- assert( len >= 0 && len <= 16000000 );
+ if( len < 0 || len > 16000000 ) {
+ cout << "bad recv() len: " << len << '\n';
+ return false;
+ }
int z = (len+1023)&0xfffffc00; assert(z>=len);
MsgData *md = (MsgData *) malloc(z);
md->len = len;
- if ( len <= 0 ){
- cout << "got a length of 0, something is wrong" << endl;
- return false;
- }
+ if ( len <= 0 ){
+ cout << "got a length of 0, something is wrong" << endl;
+ return false;
+ }
char *p = (char *) &md->id;
int left = len -4;
diff --git a/grid/message.h b/grid/message.h
index 604e9a67343..be2749c8bea 100644
--- a/grid/message.h
+++ b/grid/message.h
@@ -45,6 +45,7 @@ public:
private:
int sock;
+public:
SockAddr farEnd;
};
diff --git a/stdafx.h b/stdafx.h
index d1f6d9d5071..cc576e18bfa 100644
--- a/stdafx.h
+++ b/stdafx.h
@@ -99,12 +99,21 @@ inline ostream& problem() {
time_t t;
time(&t);
string now(ctime(&t),0,20);
- problems << "problem " << now;
+ problems << "~ " << now;
if( client )
problems << curNs << ' ';
return problems;
}
+/* for now, running on win32 means development not production --
+ use this to log things just there.
+*/
+#if !defined(_WIN32)
+#define DEV if( 1 )
+#else
+#define DEV if( 0 )
+#endif
+
#define DEBUGGING if( 0 )
extern unsigned occasion;
diff --git a/util/goodies.h b/util/goodies.h
index 94ca721aa8b..af6fd3f20b0 100644
--- a/util/goodies.h
+++ b/util/goodies.h
@@ -13,6 +13,20 @@ inline pthread_t GetCurrentThreadId() { return pthread_self(); }
/* set to TRUE if we are exiting */
extern bool goingAway;
+/* find the multimap member which matches a particular key and value.
+
+ note this can be slow if there are a lot with the same key.
+*/
+template<class C,class K,class V> inline typename C::iterator kv_find(C& c, const K& k,const V& v) {
+ pair<typename C::iterator,typename C::iterator> p = c.equal_range(k);
+
+ for( typename C::iterator it=p.first; it!=p.second; ++it)
+ if( it->second == v )
+ return it;
+
+ return c.end();
+}
+
bool isPrime(int n);
int nextPrime(int n);