diff options
author | Dwight <dmerriman@gmail.com> | 2008-08-13 12:17:18 -0400 |
---|---|---|
committer | Dwight <dmerriman@gmail.com> | 2008-08-13 12:17:18 -0400 |
commit | edcee6aaa89618642f3be46f552abc3b6e8e938f (patch) | |
tree | 843679c36cb0e53f49733b8d11ced5f0db4562ba | |
parent | 9db9bdba5a6d5f5ad52d1e846072a13484f3469c (diff) | |
download | mongo-edcee6aaa89618642f3be46f552abc3b6e8e938f.tar.gz |
tailable cursors working
-rw-r--r-- | db/btree.cpp | 1 | ||||
-rw-r--r-- | db/clientcursor.cpp | 28 | ||||
-rw-r--r-- | db/cursor.h | 147 | ||||
-rw-r--r-- | db/db.cpp | 2 | ||||
-rw-r--r-- | db/db.vcproj | 4 | ||||
-rw-r--r-- | db/dbclient.cpp | 8 | ||||
-rw-r--r-- | db/dbclient.h | 6 | ||||
-rw-r--r-- | db/pdfile.h | 82 | ||||
-rw-r--r-- | db/query.cpp | 147 | ||||
-rw-r--r-- | db/query.h | 8 | ||||
-rw-r--r-- | grid/message.h | 2 |
11 files changed, 265 insertions, 170 deletions
diff --git a/db/btree.cpp b/db/btree.cpp index eaf3f3ce793..e5fda806607 100644 --- a/db/btree.cpp +++ b/db/btree.cpp @@ -639,7 +639,6 @@ DiskLoc BtreeBucket::advance(const DiskLoc& thisLoc, int& keyOfs, int direction, int ko = keyOfs + direction; DiskLoc nextDown = childForPos(ko+adj); if( !nextDown.isNull() ) { -// nextDown.btree()->dump();//TEMP: while( 1 ) { keyOfs = direction>0 ? 0 : nextDown.btree()->n - 1; DiskLoc loc= nextDown.btree()->childForPos(keyOfs + adj); diff --git a/db/clientcursor.cpp b/db/clientcursor.cpp index 935aee4b78e..bfb5fd5d7b4 100644 --- a/db/clientcursor.cpp +++ b/db/clientcursor.cpp @@ -101,10 +101,28 @@ void aboutToDelete(const DiskLoc& dl) { for( vector<ClientCursor*>::iterator i = toAdvance.begin(); i != toAdvance.end(); ++i ) { - (*i)->c->checkLocation(); - (*i)->c->advance(); - wassert( (*i)->c->currLoc() != dl ); - (*i)->updateLocation(); + Cursor *c = (*i)->c.get(); + DiskLoc tmp1 = c->currLoc(); + if( tmp1 != dl ) { + /* this might indicate a failure to call ClientCursor::updateLocation() */ + problem() << "warning: cursor loc does not match byLoc position!" << endl; + } + c->checkLocation(); + if( c->tailing() ) { + DEV cout << "killing cursor as we would have to advance it and it is tailable" << endl; + delete *i; + continue; + } + c->advance(); + DiskLoc newLoc = c->currLoc(); + if( newLoc.isNull() ) { + // advanced to end -- delete cursor + delete *i; + } + else { + wassert( newLoc != dl ); + (*i)->updateLocation(); + } } } @@ -125,7 +143,7 @@ void ClientCursor::updateLocation() { assert( cursorid ); DiskLoc cl = c->currLoc(); if( lastLoc() == cl ) { - log() << "info: lastloc==curloc " << ns << '\n'; + //log() << "info: lastloc==curloc " << ns << '\n'; return; } setLastLoc(cl); diff --git a/db/cursor.h b/db/cursor.h new file mode 100644 index 00000000000..ec063d56e50 --- /dev/null +++ b/db/cursor.h @@ -0,0 +1,147 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../stdafx.h" + +/* Query cursors, base class. This is for our internal cursors. "ClientCursor" is a separate + concept and is for the user's cursor. +*/ +class Cursor { +public: + virtual bool ok() = 0; + bool eof() { return !ok(); } + virtual Record* _current() = 0; + virtual JSObj current() = 0; + virtual DiskLoc currLoc() = 0; + virtual bool advance() = 0; /*true=ok*/ + + /* Implement these if you want the cursor to be "tailable" */ + /* tailable(): if true, cursor has tailable capability AND + the user requested use of those semantics. */ + virtual bool tailable() { return false; } + /* indicates we should mark where we are and go into tail mode. */ + virtual void setAtTail() { assert(false); } + /* you must call tailResume before reusing the cursor */ + virtual void tailResume() { } + /* indicates ifi we are actively tailing. once it goes active, + this should return treu even after tailResume(). */ + virtual bool tailing() { return false; } + + virtual void aboutToDeleteBucket(const DiskLoc& b) { } + + /* optional to implement. if implemented, means 'this' is a prototype */ + virtual Cursor* clone() { return 0; } + + virtual bool tempStopOnMiss() { return false; } + + /* called after every query block is iterated -- i.e. between getMore() blocks + so you can note where we are, if necessary. + */ + virtual void noteLocation() { } + + /* called before query getmore block is iterated */ + virtual void checkLocation() { } + + virtual const char * toString() { return "abstract?"; } + + /* used for multikey index traversal to avoid sending back dups. see JSMatcher::matches() */ + set<DiskLoc> dups; + bool getsetdup(DiskLoc loc) { + /* to save mem only call this when there is risk of dups (e.g. when 'deep'/multikey) */ + if( dups.count(loc) > 0 ) + return true; + dups.insert(loc); + return false; + } +}; + +/* table-scan style cursor */ +class BasicCursor : public Cursor { +protected: + DiskLoc curr, last; + +private: + // for tailing: + enum State { Normal, TailPoint, TailResumed } state; + void init() { state = Normal; } + +public: + bool ok() { return !curr.isNull(); } + Record* _current() { + assert( ok() ); + return curr.rec(); + } + JSObj current() { + Record *r = _current(); + JSObj j(r); + return j; + } + virtual DiskLoc currLoc() { return curr; } + + bool advance() { + if( eof() ) + return false; + Record *r = _current(); + last = curr; + curr = r->getNext(curr); + return ok(); + } + + BasicCursor(DiskLoc dl) : curr(dl) { init(); } + BasicCursor() { init(); } + virtual const char * toString() { return "BasicCursor"; } + + virtual void tailResume() { + if( state == TailPoint ) { + state = TailResumed; + advance(); + } + } + virtual void setAtTail() { + assert( state != TailPoint ); + assert( curr.isNull() ); + assert( !last.isNull() ); + curr = last; last.Null(); + state = TailPoint; + } + virtual bool tailable() { + // to go into tail mode we need a non-null point of reference for resumption + return !last.isNull(); + } + virtual bool tailing() { + return state != Normal; + } +}; + +/* used for order { $natural: -1 } */ +class ReverseCursor : public BasicCursor { +public: + bool advance() { + if( eof() ) + return false; + Record *r = _current(); + last = curr; + curr = r->getPrev(curr); + return ok(); + } + + ReverseCursor(DiskLoc dl) : BasicCursor(dl) { } + ReverseCursor() { } + virtual const char * toString() { return "ReverseCursor"; } +}; + diff --git a/db/db.cpp b/db/db.cpp index 531cd772384..c0ef07e6cfa 100644 --- a/db/db.cpp +++ b/db/db.cpp @@ -249,7 +249,7 @@ void receivedQuery(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, * QueryResult* msgdata; try { - msgdata = runQuery(m, ns, ntoskip, ntoreturn, query, fields, ss); + msgdata = runQuery(m, ns, ntoskip, ntoreturn, query, fields, ss, m.data->dataAsInt()); } catch( AssertionException ) { ss << " exception "; diff --git a/db/db.vcproj b/db/db.vcproj index 710b14fa51e..2d8afc192d3 100644 --- a/db/db.vcproj +++ b/db/db.vcproj @@ -273,6 +273,10 @@ >
</File>
<File
+ RelativePath=".\cursor.h"
+ >
+ </File>
+ <File
RelativePath=".\db.h"
>
</File>
diff --git a/db/dbclient.cpp b/db/dbclient.cpp index df7e762658c..a92caa3b3d6 100644 --- a/db/dbclient.cpp +++ b/db/dbclient.cpp @@ -91,8 +91,6 @@ auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, JSObj query, /* -- DBClientCursor ---------------------------------------------- */
void DBClientCursor::requestMore() {
-cout << "TEMP REQUESTMORE" << endl;
-
assert( cursorId && pos == nReturned );
BufBuilder b;
@@ -113,7 +111,7 @@ cout << "TEMP REQUESTMORE" << endl; void DBClientCursor::dataReceived() {
QueryResult *qr = (QueryResult *) m->data;
- if( qr->resultOptions() & ResultOption_CursorNotFound ) {
+ if( qr->resultFlags() & ResultFlag_CursorNotFound ) {
// cursor id no longer valid at the server.
assert( qr->cursorId == 0 );
cursorId = 0; // 0 indicates no longer valid (dead)
@@ -126,7 +124,9 @@ void DBClientCursor::dataReceived() { nReturned = qr->nReturned;
pos = 0;
data = qr->data();
- assert( nReturned || cursorId == 0 );
+ /* this assert would fire the way we currently work:
+ assert( nReturned || cursorId == 0 );
+ */
}
bool DBClientCursor::more() {
diff --git a/db/dbclient.h b/db/dbclient.h index a9c390be704..03145d47126 100644 --- a/db/dbclient.h +++ b/db/dbclient.h @@ -28,7 +28,7 @@ enum { like any "latent cursor", the cursor may become invalid at some point -- for example if that final object it references were deleted. Thus, you should be prepared to requery if you get back - ResultOption_CursorNotFound. + ResultFlag_CursorNotFound. */ Option_CursorTailable = 2 }; @@ -41,9 +41,7 @@ struct QueryResult : public MsgData { int startingFrom; int nReturned; const char *data() { return (char *) (((int *)&nReturned)+1); } - int resultOptions() const { - return *((int *) _data); - } + int& resultFlags() { return dataAsInt(); } }; #pragma pack(pop) diff --git a/db/pdfile.h b/db/pdfile.h index 2450bb56a41..9e7a8433d51 100644 --- a/db/pdfile.h +++ b/db/pdfile.h @@ -246,87 +246,7 @@ inline Extent* PhysicalDataFile::getExtent(DiskLoc loc) { return e; } -class Cursor { -public: - virtual bool ok() = 0; - bool eof() { return !ok(); } - virtual Record* _current() = 0; - virtual JSObj current() = 0; - virtual DiskLoc currLoc() = 0; - virtual bool advance() = 0; /*true=ok*/ - - virtual void aboutToDeleteBucket(const DiskLoc& b) { } - - /* optional to implement. if implemented, means 'this' is a prototype */ - virtual Cursor* clone() { return 0; } - - virtual bool tempStopOnMiss() { return false; } - - /* called after every query block is iterated -- i.e. between getMore() blocks - so you can note where we are, if necessary. - */ - virtual void noteLocation() { } - - /* called before query getmore block is iterated */ - virtual void checkLocation() { } - - virtual const char * toString() { return "abstract?"; } - - /* used for multikey index traversal to avoid sending back dups. see JSMatcher::matches() */ - set<DiskLoc> dups; - bool getsetdup(DiskLoc loc) { - /* to save mem only call this when there is risk of dups (e.g. when 'deep'/multikey) */ - if( dups.count(loc) > 0 ) - return true; - dups.insert(loc); - return false; - } -}; - -class BasicCursor : public Cursor { -public: - bool ok() { return !curr.isNull(); } - Record* _current() { - assert( ok() ); - return curr.rec(); - } - JSObj current() { - Record *r = _current(); - JSObj j(r); - return j; - } - virtual DiskLoc currLoc() { return curr; } - - bool advance() { - if( eof() ) - return false; - Record *r = _current(); - curr = r->getNext(curr); - return ok(); - } - - BasicCursor(DiskLoc dl) : curr(dl) { } - BasicCursor() { } - virtual const char * toString() { return "BasicCursor"; } - - DiskLoc curr; -}; - -/* used for order { $natural: -1 } */ -class ReverseCursor : public BasicCursor { -public: - bool advance() { - if( eof() ) - return false; - Record *r = _current(); - curr = r->getPrev(curr); - return ok(); - } - - ReverseCursor(DiskLoc dl) : BasicCursor(dl) { } - ReverseCursor() { } - virtual const char * toString() { return "ReverseCursor"; } -}; +#include "cursor.h" inline Record* PhysicalDataFile::recordAt(DiskLoc dl) { return header->getRecord(dl); } diff --git a/db/query.cpp b/db/query.cpp index 1716d55dea0..b93ddc85a72 100644 --- a/db/query.cpp +++ b/db/query.cpp @@ -216,14 +216,8 @@ int deleteObjects(const char *ns, JSObj pattern, bool justOne, bool god) { c = theDataFileMgr.findAll(ns); Cursor &tempDebug = *c; - int temp = 0; - int tempd = 0; - - DiskLoc _tempDelLoc; while( c->ok() ) { - temp++; - Record *r = c->_current(); DiskLoc rloc = c->currLoc(); c->advance(); // must advance before deleting as the next ptr will die @@ -238,11 +232,9 @@ int deleteObjects(const char *ns, JSObj pattern, bool justOne, bool god) { assert( !deep || !c->getsetdup(rloc) ); // can't be a dup, we deleted it! if( !justOne ) c->noteLocation(); - _tempDelLoc = rloc; theDataFileMgr.deleteRecord(ns, r, rloc); nDeleted++; - tempd = temp; if( justOne ) break; c->checkLocation(); @@ -911,7 +903,7 @@ int runCount(const char *ns, JSObj& cmd, string& err) { } QueryResult* runQuery(Message& message, const char *ns, int ntoskip, int _ntoreturn, JSObj jsobj, - auto_ptr< set<string> > filter, stringstream& ss) + auto_ptr< set<string> > filter, stringstream& ss, int queryOptions) { time_t t = time(0); bool wantMore = true; @@ -957,64 +949,56 @@ QueryResult* runQuery(Message& message, const char *ns, int ntoskip, int _ntoret int nscanned = 0; auto_ptr<Cursor> c = getSpecialCursor(ns); + if( c.get() == 0 ) + c = getIndexCursor(ns, query, order); + if( c.get() == 0 ) + c = findTableScan(ns, order); - /*try*/{ + while( c->ok() ) { + JSObj js = c->current(); + if( queryTraceLevel >= 50 ) + cout << " checking against:\n " << js.toString() << endl; + nscanned++; + bool deep; - if( c.get() == 0 ) { - c = getIndexCursor(ns, query, order); - } - if( c.get() == 0 ) { - //c = theDataFileMgr.findAll(ns); - c = findTableScan(ns, order); + if( !matcher->matches(js, &deep) ) { + if( c->tempStopOnMiss() ) + break; } - - while( c->ok() ) { - JSObj js = c->current(); - if( queryTraceLevel >= 50 ) - cout << " checking against:\n " << js.toString() << endl; - nscanned++; - bool deep; - -JSMatcher &debug = *matcher; -assert( debug.getN() < 5000 ); - - if( !matcher->matches(js, &deep) ) { - if( c->tempStopOnMiss() ) - break; + else if( !deep || !c->getsetdup(c->currLoc()) ) { // i.e., check for dups on deep items only + // got a match. + if( ntoskip > 0 ) { + ntoskip--; } - else if( !deep || !c->getsetdup(c->currLoc()) ) { // i.e., check for dups on deep items only - // got a match. - if( ntoskip > 0 ) { - ntoskip--; + else { + bool ok = true; + assert( js.objsize() >= 0 ); //defensive for segfaults + if( filter.get() ) { + // we just want certain fields from the object. + JSObj x; + ok = x.addFields(js, *filter) > 0; + if( ok ) + b.append((void*) x.objdata(), x.objsize()); } else { - bool ok = true; - assert( js.objsize() >= 0 ); //defensive for segfaults - if( filter.get() ) { - // we just want certain fields from the object. - JSObj x; - ok = x.addFields(js, *filter) > 0; - if( ok ) - b.append((void*) x.objdata(), x.objsize()); - } - else { - b.append((void*) js.objdata(), js.objsize()); - } - if( ok ) { - n++; - if( (ntoreturn>0 && (n >= ntoreturn || b.len() > MaxBytesToReturnToClientAtOnce)) || - (ntoreturn==0 && (b.len()>1*1024*1024 || n>=101)) ) { - /* if ntoreturn is zero, we return up to 101 objects. on the subsequent getmore, there - is only a size limit. The idea is that on a find() where one doesn't use much results, - we don't return much, but once getmore kicks in, we start pushing significant quantities. - - The n limit (vs. size) is important when someone fetches only one small field from big - objects, which causes massive scanning server-side. - */ - /* if only 1 requested, no cursor saved for efficiency...we assume it is findOne() */ - if( wantMore && ntoreturn != 1 ) { + b.append((void*) js.objdata(), js.objsize()); + } + if( ok ) { + n++; + if( (ntoreturn>0 && (n >= ntoreturn || b.len() > MaxBytesToReturnToClientAtOnce)) || + (ntoreturn==0 && (b.len()>1*1024*1024 || n>=101)) ) { + /* if ntoreturn is zero, we return up to 101 objects. on the subsequent getmore, there + is only a size limit. The idea is that on a find() where one doesn't use much results, + we don't return much, but once getmore kicks in, we start pushing significant quantities. + + The n limit (vs. size) is important when someone fetches only one small field from big + objects, which causes massive scanning server-side. + */ + /* if only 1 requested, no cursor saved for efficiency...we assume it is findOne() */ + if( wantMore && ntoreturn != 1 ) { + if( useCursors ) { c->advance(); - if( c->ok() && useCursors ) { + if( c->ok() ) { // more...so save a cursor ClientCursor *cc = new ClientCursor(); cc->c = c; @@ -1028,17 +1012,31 @@ assert( debug.getN() < 5000 ); cc->updateLocation(); } } - break; - } + } + break; } } } - c->advance(); } + c->advance(); + } - if( client->profile ) - ss << " nscanned:" << nscanned << ' '; + if( cursorid == 0 && (queryOptions & Option_CursorTailable) && c->tailable() ) { + c->setAtTail(); + ClientCursor *cc = new ClientCursor(); + cc->c = c; + cursorid = cc->cursorid; + DEV cout << " query has no more but tailable, cursorid: " << cursorid << endl; + cc->matcher = matcher; + cc->ns = ns; + cc->pos = n; + cc->filter = filter; + cc->originalMessage = message; + cc->updateLocation(); } + + if( client->profile ) + ss << " nscanned:" << nscanned << ' '; } QueryResult *qr = (QueryResult *) b.buf(); @@ -1087,21 +1085,27 @@ QueryResult* getMore(const char *ns, int ntoreturn, long long cursorid) { b.skip(sizeof(QueryResult)); + int resultFlags = 0; int start = 0; int n = 0; if( !cc ) { DEV log() << "getMore: cursorid not found " << ns << " " << cursorid << endl; cursorid = 0; + resultFlags = ResultFlag_CursorNotFound; } else { start = cc->pos; Cursor *c = cc->c.get(); c->checkLocation(); + c->tailResume(); while( 1 ) { if( !c->ok() ) { done: - // done! kill cursor. + if( c->tailing() ) { + c->setAtTail(); + break; + } DEV log() << " getmore: last batch, erasing cursor " << cursorid << endl; bool ok = ClientCursor::erase(cursorid); assert(ok); @@ -1138,8 +1142,10 @@ done: if( (ntoreturn>0 && (n >= ntoreturn || b.len() > MaxBytesToReturnToClientAtOnce)) || (ntoreturn==0 && b.len()>1*1024*1024) ) { c->advance(); + if( c->tailing() && !c->ok() ) + c->setAtTail(); cc->pos += n; - cc->updateLocation(); + //cc->updateLocation(); break; } } @@ -1147,14 +1153,15 @@ done: } c->advance(); } + cc->updateLocation(); } QueryResult *qr = (QueryResult *) b.buf(); - qr->cursorId = cursorid; - qr->startingFrom = start; qr->len = b.len(); - // qr->reserved = 0; qr->operation = opReply; + qr->resultFlags() = resultFlags; + qr->cursorId = cursorid; + qr->startingFrom = start; qr->nReturned = n; b.decouple(); diff --git a/db/query.h b/db/query.h index 56b31e0a99c..aea274d1efe 100644 --- a/db/query.h +++ b/db/query.h @@ -68,17 +68,17 @@ /* db response format Query or GetMore: // see struct QueryResult - int resultOptions = 0; + int resultFlags = 0; int64 cursorID; int startingFrom; int nReturned; // 0=infinity list of marshalled JSObjects; */ -/* the field 'resultOptions' above */ +/* the field 'resultFlags' above */ enum { /* returned, with zero results, when getMore is called but the cursor id is not valid at the server. */ - ResultOption_CursorNotFound = 1 + ResultFlag_CursorNotFound = 1 }; // grab struct QueryResult from: @@ -90,7 +90,7 @@ QueryResult* getMore(const char *ns, int ntoreturn, long long cursorid); // caller must free() returned QueryResult. QueryResult* runQuery(Message&, const char *ns, int ntoskip, int ntoreturn, JSObj j, auto_ptr< set<string> > fieldFilter, - stringstream&); + stringstream&, int queryOptions); void updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss); diff --git a/grid/message.h b/grid/message.h index 7a3235ae265..67ef1593948 100644 --- a/grid/message.h +++ b/grid/message.h @@ -91,6 +91,8 @@ struct MsgData { int operation; char _data[4]; + int& dataAsInt() { return *((int *) _data); } + int dataLen(); // len without header }; const int MsgDataHeaderSize = sizeof(MsgData) - 4; |