summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDwight <dmerriman@gmail.com>2008-08-13 12:17:18 -0400
committerDwight <dmerriman@gmail.com>2008-08-13 12:17:18 -0400
commitedcee6aaa89618642f3be46f552abc3b6e8e938f (patch)
tree843679c36cb0e53f49733b8d11ced5f0db4562ba
parent9db9bdba5a6d5f5ad52d1e846072a13484f3469c (diff)
downloadmongo-edcee6aaa89618642f3be46f552abc3b6e8e938f.tar.gz
tailable cursors working
-rw-r--r--db/btree.cpp1
-rw-r--r--db/clientcursor.cpp28
-rw-r--r--db/cursor.h147
-rw-r--r--db/db.cpp2
-rw-r--r--db/db.vcproj4
-rw-r--r--db/dbclient.cpp8
-rw-r--r--db/dbclient.h6
-rw-r--r--db/pdfile.h82
-rw-r--r--db/query.cpp147
-rw-r--r--db/query.h8
-rw-r--r--grid/message.h2
11 files changed, 265 insertions, 170 deletions
diff --git a/db/btree.cpp b/db/btree.cpp
index eaf3f3ce793..e5fda806607 100644
--- a/db/btree.cpp
+++ b/db/btree.cpp
@@ -639,7 +639,6 @@ DiskLoc BtreeBucket::advance(const DiskLoc& thisLoc, int& keyOfs, int direction,
int ko = keyOfs + direction;
DiskLoc nextDown = childForPos(ko+adj);
if( !nextDown.isNull() ) {
-// nextDown.btree()->dump();//TEMP:
while( 1 ) {
keyOfs = direction>0 ? 0 : nextDown.btree()->n - 1;
DiskLoc loc= nextDown.btree()->childForPos(keyOfs + adj);
diff --git a/db/clientcursor.cpp b/db/clientcursor.cpp
index 935aee4b78e..bfb5fd5d7b4 100644
--- a/db/clientcursor.cpp
+++ b/db/clientcursor.cpp
@@ -101,10 +101,28 @@ void aboutToDelete(const DiskLoc& dl) {
for( vector<ClientCursor*>::iterator i = toAdvance.begin();
i != toAdvance.end(); ++i )
{
- (*i)->c->checkLocation();
- (*i)->c->advance();
- wassert( (*i)->c->currLoc() != dl );
- (*i)->updateLocation();
+ Cursor *c = (*i)->c.get();
+ DiskLoc tmp1 = c->currLoc();
+ if( tmp1 != dl ) {
+ /* this might indicate a failure to call ClientCursor::updateLocation() */
+ problem() << "warning: cursor loc does not match byLoc position!" << endl;
+ }
+ c->checkLocation();
+ if( c->tailing() ) {
+ DEV cout << "killing cursor as we would have to advance it and it is tailable" << endl;
+ delete *i;
+ continue;
+ }
+ c->advance();
+ DiskLoc newLoc = c->currLoc();
+ if( newLoc.isNull() ) {
+ // advanced to end -- delete cursor
+ delete *i;
+ }
+ else {
+ wassert( newLoc != dl );
+ (*i)->updateLocation();
+ }
}
}
@@ -125,7 +143,7 @@ void ClientCursor::updateLocation() {
assert( cursorid );
DiskLoc cl = c->currLoc();
if( lastLoc() == cl ) {
- log() << "info: lastloc==curloc " << ns << '\n';
+ //log() << "info: lastloc==curloc " << ns << '\n';
return;
}
setLastLoc(cl);
diff --git a/db/cursor.h b/db/cursor.h
new file mode 100644
index 00000000000..ec063d56e50
--- /dev/null
+++ b/db/cursor.h
@@ -0,0 +1,147 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../stdafx.h"
+
+/* Query cursors, base class. This is for our internal cursors. "ClientCursor" is a separate
+ concept and is for the user's cursor.
+*/
+class Cursor {
+public:
+ virtual bool ok() = 0;
+ bool eof() { return !ok(); }
+ virtual Record* _current() = 0;
+ virtual JSObj current() = 0;
+ virtual DiskLoc currLoc() = 0;
+ virtual bool advance() = 0; /*true=ok*/
+
+ /* Implement these if you want the cursor to be "tailable" */
+ /* tailable(): if true, cursor has tailable capability AND
+ the user requested use of those semantics. */
+ virtual bool tailable() { return false; }
+ /* indicates we should mark where we are and go into tail mode. */
+ virtual void setAtTail() { assert(false); }
+ /* you must call tailResume before reusing the cursor */
+ virtual void tailResume() { }
+ /* indicates ifi we are actively tailing. once it goes active,
+ this should return treu even after tailResume(). */
+ virtual bool tailing() { return false; }
+
+ virtual void aboutToDeleteBucket(const DiskLoc& b) { }
+
+ /* optional to implement. if implemented, means 'this' is a prototype */
+ virtual Cursor* clone() { return 0; }
+
+ virtual bool tempStopOnMiss() { return false; }
+
+ /* called after every query block is iterated -- i.e. between getMore() blocks
+ so you can note where we are, if necessary.
+ */
+ virtual void noteLocation() { }
+
+ /* called before query getmore block is iterated */
+ virtual void checkLocation() { }
+
+ virtual const char * toString() { return "abstract?"; }
+
+ /* used for multikey index traversal to avoid sending back dups. see JSMatcher::matches() */
+ set<DiskLoc> dups;
+ bool getsetdup(DiskLoc loc) {
+ /* to save mem only call this when there is risk of dups (e.g. when 'deep'/multikey) */
+ if( dups.count(loc) > 0 )
+ return true;
+ dups.insert(loc);
+ return false;
+ }
+};
+
+/* table-scan style cursor */
+class BasicCursor : public Cursor {
+protected:
+ DiskLoc curr, last;
+
+private:
+ // for tailing:
+ enum State { Normal, TailPoint, TailResumed } state;
+ void init() { state = Normal; }
+
+public:
+ bool ok() { return !curr.isNull(); }
+ Record* _current() {
+ assert( ok() );
+ return curr.rec();
+ }
+ JSObj current() {
+ Record *r = _current();
+ JSObj j(r);
+ return j;
+ }
+ virtual DiskLoc currLoc() { return curr; }
+
+ bool advance() {
+ if( eof() )
+ return false;
+ Record *r = _current();
+ last = curr;
+ curr = r->getNext(curr);
+ return ok();
+ }
+
+ BasicCursor(DiskLoc dl) : curr(dl) { init(); }
+ BasicCursor() { init(); }
+ virtual const char * toString() { return "BasicCursor"; }
+
+ virtual void tailResume() {
+ if( state == TailPoint ) {
+ state = TailResumed;
+ advance();
+ }
+ }
+ virtual void setAtTail() {
+ assert( state != TailPoint );
+ assert( curr.isNull() );
+ assert( !last.isNull() );
+ curr = last; last.Null();
+ state = TailPoint;
+ }
+ virtual bool tailable() {
+ // to go into tail mode we need a non-null point of reference for resumption
+ return !last.isNull();
+ }
+ virtual bool tailing() {
+ return state != Normal;
+ }
+};
+
+/* used for order { $natural: -1 } */
+class ReverseCursor : public BasicCursor {
+public:
+ bool advance() {
+ if( eof() )
+ return false;
+ Record *r = _current();
+ last = curr;
+ curr = r->getPrev(curr);
+ return ok();
+ }
+
+ ReverseCursor(DiskLoc dl) : BasicCursor(dl) { }
+ ReverseCursor() { }
+ virtual const char * toString() { return "ReverseCursor"; }
+};
+
diff --git a/db/db.cpp b/db/db.cpp
index 531cd772384..c0ef07e6cfa 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -249,7 +249,7 @@ void receivedQuery(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, *
QueryResult* msgdata;
try {
- msgdata = runQuery(m, ns, ntoskip, ntoreturn, query, fields, ss);
+ msgdata = runQuery(m, ns, ntoskip, ntoreturn, query, fields, ss, m.data->dataAsInt());
}
catch( AssertionException ) {
ss << " exception ";
diff --git a/db/db.vcproj b/db/db.vcproj
index 710b14fa51e..2d8afc192d3 100644
--- a/db/db.vcproj
+++ b/db/db.vcproj
@@ -273,6 +273,10 @@
>
</File>
<File
+ RelativePath=".\cursor.h"
+ >
+ </File>
+ <File
RelativePath=".\db.h"
>
</File>
diff --git a/db/dbclient.cpp b/db/dbclient.cpp
index df7e762658c..a92caa3b3d6 100644
--- a/db/dbclient.cpp
+++ b/db/dbclient.cpp
@@ -91,8 +91,6 @@ auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, JSObj query,
/* -- DBClientCursor ---------------------------------------------- */
void DBClientCursor::requestMore() {
-cout << "TEMP REQUESTMORE" << endl;
-
assert( cursorId && pos == nReturned );
BufBuilder b;
@@ -113,7 +111,7 @@ cout << "TEMP REQUESTMORE" << endl;
void DBClientCursor::dataReceived() {
QueryResult *qr = (QueryResult *) m->data;
- if( qr->resultOptions() & ResultOption_CursorNotFound ) {
+ if( qr->resultFlags() & ResultFlag_CursorNotFound ) {
// cursor id no longer valid at the server.
assert( qr->cursorId == 0 );
cursorId = 0; // 0 indicates no longer valid (dead)
@@ -126,7 +124,9 @@ void DBClientCursor::dataReceived() {
nReturned = qr->nReturned;
pos = 0;
data = qr->data();
- assert( nReturned || cursorId == 0 );
+ /* this assert would fire the way we currently work:
+ assert( nReturned || cursorId == 0 );
+ */
}
bool DBClientCursor::more() {
diff --git a/db/dbclient.h b/db/dbclient.h
index a9c390be704..03145d47126 100644
--- a/db/dbclient.h
+++ b/db/dbclient.h
@@ -28,7 +28,7 @@ enum {
like any "latent cursor", the cursor may become invalid at some point -- for example if that
final object it references were deleted. Thus, you should be prepared to requery if you get back
- ResultOption_CursorNotFound.
+ ResultFlag_CursorNotFound.
*/
Option_CursorTailable = 2
};
@@ -41,9 +41,7 @@ struct QueryResult : public MsgData {
int startingFrom;
int nReturned;
const char *data() { return (char *) (((int *)&nReturned)+1); }
- int resultOptions() const {
- return *((int *) _data);
- }
+ int& resultFlags() { return dataAsInt(); }
};
#pragma pack(pop)
diff --git a/db/pdfile.h b/db/pdfile.h
index 2450bb56a41..9e7a8433d51 100644
--- a/db/pdfile.h
+++ b/db/pdfile.h
@@ -246,87 +246,7 @@ inline Extent* PhysicalDataFile::getExtent(DiskLoc loc) {
return e;
}
-class Cursor {
-public:
- virtual bool ok() = 0;
- bool eof() { return !ok(); }
- virtual Record* _current() = 0;
- virtual JSObj current() = 0;
- virtual DiskLoc currLoc() = 0;
- virtual bool advance() = 0; /*true=ok*/
-
- virtual void aboutToDeleteBucket(const DiskLoc& b) { }
-
- /* optional to implement. if implemented, means 'this' is a prototype */
- virtual Cursor* clone() { return 0; }
-
- virtual bool tempStopOnMiss() { return false; }
-
- /* called after every query block is iterated -- i.e. between getMore() blocks
- so you can note where we are, if necessary.
- */
- virtual void noteLocation() { }
-
- /* called before query getmore block is iterated */
- virtual void checkLocation() { }
-
- virtual const char * toString() { return "abstract?"; }
-
- /* used for multikey index traversal to avoid sending back dups. see JSMatcher::matches() */
- set<DiskLoc> dups;
- bool getsetdup(DiskLoc loc) {
- /* to save mem only call this when there is risk of dups (e.g. when 'deep'/multikey) */
- if( dups.count(loc) > 0 )
- return true;
- dups.insert(loc);
- return false;
- }
-};
-
-class BasicCursor : public Cursor {
-public:
- bool ok() { return !curr.isNull(); }
- Record* _current() {
- assert( ok() );
- return curr.rec();
- }
- JSObj current() {
- Record *r = _current();
- JSObj j(r);
- return j;
- }
- virtual DiskLoc currLoc() { return curr; }
-
- bool advance() {
- if( eof() )
- return false;
- Record *r = _current();
- curr = r->getNext(curr);
- return ok();
- }
-
- BasicCursor(DiskLoc dl) : curr(dl) { }
- BasicCursor() { }
- virtual const char * toString() { return "BasicCursor"; }
-
- DiskLoc curr;
-};
-
-/* used for order { $natural: -1 } */
-class ReverseCursor : public BasicCursor {
-public:
- bool advance() {
- if( eof() )
- return false;
- Record *r = _current();
- curr = r->getPrev(curr);
- return ok();
- }
-
- ReverseCursor(DiskLoc dl) : BasicCursor(dl) { }
- ReverseCursor() { }
- virtual const char * toString() { return "ReverseCursor"; }
-};
+#include "cursor.h"
inline Record* PhysicalDataFile::recordAt(DiskLoc dl) { return header->getRecord(dl); }
diff --git a/db/query.cpp b/db/query.cpp
index 1716d55dea0..b93ddc85a72 100644
--- a/db/query.cpp
+++ b/db/query.cpp
@@ -216,14 +216,8 @@ int deleteObjects(const char *ns, JSObj pattern, bool justOne, bool god) {
c = theDataFileMgr.findAll(ns);
Cursor &tempDebug = *c;
- int temp = 0;
- int tempd = 0;
-
- DiskLoc _tempDelLoc;
while( c->ok() ) {
- temp++;
-
Record *r = c->_current();
DiskLoc rloc = c->currLoc();
c->advance(); // must advance before deleting as the next ptr will die
@@ -238,11 +232,9 @@ int deleteObjects(const char *ns, JSObj pattern, bool justOne, bool god) {
assert( !deep || !c->getsetdup(rloc) ); // can't be a dup, we deleted it!
if( !justOne )
c->noteLocation();
- _tempDelLoc = rloc;
theDataFileMgr.deleteRecord(ns, r, rloc);
nDeleted++;
- tempd = temp;
if( justOne )
break;
c->checkLocation();
@@ -911,7 +903,7 @@ int runCount(const char *ns, JSObj& cmd, string& err) {
}
QueryResult* runQuery(Message& message, const char *ns, int ntoskip, int _ntoreturn, JSObj jsobj,
- auto_ptr< set<string> > filter, stringstream& ss)
+ auto_ptr< set<string> > filter, stringstream& ss, int queryOptions)
{
time_t t = time(0);
bool wantMore = true;
@@ -957,64 +949,56 @@ QueryResult* runQuery(Message& message, const char *ns, int ntoskip, int _ntoret
int nscanned = 0;
auto_ptr<Cursor> c = getSpecialCursor(ns);
+ if( c.get() == 0 )
+ c = getIndexCursor(ns, query, order);
+ if( c.get() == 0 )
+ c = findTableScan(ns, order);
- /*try*/{
+ while( c->ok() ) {
+ JSObj js = c->current();
+ if( queryTraceLevel >= 50 )
+ cout << " checking against:\n " << js.toString() << endl;
+ nscanned++;
+ bool deep;
- if( c.get() == 0 ) {
- c = getIndexCursor(ns, query, order);
- }
- if( c.get() == 0 ) {
- //c = theDataFileMgr.findAll(ns);
- c = findTableScan(ns, order);
+ if( !matcher->matches(js, &deep) ) {
+ if( c->tempStopOnMiss() )
+ break;
}
-
- while( c->ok() ) {
- JSObj js = c->current();
- if( queryTraceLevel >= 50 )
- cout << " checking against:\n " << js.toString() << endl;
- nscanned++;
- bool deep;
-
-JSMatcher &debug = *matcher;
-assert( debug.getN() < 5000 );
-
- if( !matcher->matches(js, &deep) ) {
- if( c->tempStopOnMiss() )
- break;
+ else if( !deep || !c->getsetdup(c->currLoc()) ) { // i.e., check for dups on deep items only
+ // got a match.
+ if( ntoskip > 0 ) {
+ ntoskip--;
}
- else if( !deep || !c->getsetdup(c->currLoc()) ) { // i.e., check for dups on deep items only
- // got a match.
- if( ntoskip > 0 ) {
- ntoskip--;
+ else {
+ bool ok = true;
+ assert( js.objsize() >= 0 ); //defensive for segfaults
+ if( filter.get() ) {
+ // we just want certain fields from the object.
+ JSObj x;
+ ok = x.addFields(js, *filter) > 0;
+ if( ok )
+ b.append((void*) x.objdata(), x.objsize());
}
else {
- bool ok = true;
- assert( js.objsize() >= 0 ); //defensive for segfaults
- if( filter.get() ) {
- // we just want certain fields from the object.
- JSObj x;
- ok = x.addFields(js, *filter) > 0;
- if( ok )
- b.append((void*) x.objdata(), x.objsize());
- }
- else {
- b.append((void*) js.objdata(), js.objsize());
- }
- if( ok ) {
- n++;
- if( (ntoreturn>0 && (n >= ntoreturn || b.len() > MaxBytesToReturnToClientAtOnce)) ||
- (ntoreturn==0 && (b.len()>1*1024*1024 || n>=101)) ) {
- /* if ntoreturn is zero, we return up to 101 objects. on the subsequent getmore, there
- is only a size limit. The idea is that on a find() where one doesn't use much results,
- we don't return much, but once getmore kicks in, we start pushing significant quantities.
-
- The n limit (vs. size) is important when someone fetches only one small field from big
- objects, which causes massive scanning server-side.
- */
- /* if only 1 requested, no cursor saved for efficiency...we assume it is findOne() */
- if( wantMore && ntoreturn != 1 ) {
+ b.append((void*) js.objdata(), js.objsize());
+ }
+ if( ok ) {
+ n++;
+ if( (ntoreturn>0 && (n >= ntoreturn || b.len() > MaxBytesToReturnToClientAtOnce)) ||
+ (ntoreturn==0 && (b.len()>1*1024*1024 || n>=101)) ) {
+ /* if ntoreturn is zero, we return up to 101 objects. on the subsequent getmore, there
+ is only a size limit. The idea is that on a find() where one doesn't use much results,
+ we don't return much, but once getmore kicks in, we start pushing significant quantities.
+
+ The n limit (vs. size) is important when someone fetches only one small field from big
+ objects, which causes massive scanning server-side.
+ */
+ /* if only 1 requested, no cursor saved for efficiency...we assume it is findOne() */
+ if( wantMore && ntoreturn != 1 ) {
+ if( useCursors ) {
c->advance();
- if( c->ok() && useCursors ) {
+ if( c->ok() ) {
// more...so save a cursor
ClientCursor *cc = new ClientCursor();
cc->c = c;
@@ -1028,17 +1012,31 @@ assert( debug.getN() < 5000 );
cc->updateLocation();
}
}
- break;
- }
+ }
+ break;
}
}
}
- c->advance();
}
+ c->advance();
+ }
- if( client->profile )
- ss << " nscanned:" << nscanned << ' ';
+ if( cursorid == 0 && (queryOptions & Option_CursorTailable) && c->tailable() ) {
+ c->setAtTail();
+ ClientCursor *cc = new ClientCursor();
+ cc->c = c;
+ cursorid = cc->cursorid;
+ DEV cout << " query has no more but tailable, cursorid: " << cursorid << endl;
+ cc->matcher = matcher;
+ cc->ns = ns;
+ cc->pos = n;
+ cc->filter = filter;
+ cc->originalMessage = message;
+ cc->updateLocation();
}
+
+ if( client->profile )
+ ss << " nscanned:" << nscanned << ' ';
}
QueryResult *qr = (QueryResult *) b.buf();
@@ -1087,21 +1085,27 @@ QueryResult* getMore(const char *ns, int ntoreturn, long long cursorid) {
b.skip(sizeof(QueryResult));
+ int resultFlags = 0;
int start = 0;
int n = 0;
if( !cc ) {
DEV log() << "getMore: cursorid not found " << ns << " " << cursorid << endl;
cursorid = 0;
+ resultFlags = ResultFlag_CursorNotFound;
}
else {
start = cc->pos;
Cursor *c = cc->c.get();
c->checkLocation();
+ c->tailResume();
while( 1 ) {
if( !c->ok() ) {
done:
- // done! kill cursor.
+ if( c->tailing() ) {
+ c->setAtTail();
+ break;
+ }
DEV log() << " getmore: last batch, erasing cursor " << cursorid << endl;
bool ok = ClientCursor::erase(cursorid);
assert(ok);
@@ -1138,8 +1142,10 @@ done:
if( (ntoreturn>0 && (n >= ntoreturn || b.len() > MaxBytesToReturnToClientAtOnce)) ||
(ntoreturn==0 && b.len()>1*1024*1024) ) {
c->advance();
+ if( c->tailing() && !c->ok() )
+ c->setAtTail();
cc->pos += n;
- cc->updateLocation();
+ //cc->updateLocation();
break;
}
}
@@ -1147,14 +1153,15 @@ done:
}
c->advance();
}
+ cc->updateLocation();
}
QueryResult *qr = (QueryResult *) b.buf();
- qr->cursorId = cursorid;
- qr->startingFrom = start;
qr->len = b.len();
- // qr->reserved = 0;
qr->operation = opReply;
+ qr->resultFlags() = resultFlags;
+ qr->cursorId = cursorid;
+ qr->startingFrom = start;
qr->nReturned = n;
b.decouple();
diff --git a/db/query.h b/db/query.h
index 56b31e0a99c..aea274d1efe 100644
--- a/db/query.h
+++ b/db/query.h
@@ -68,17 +68,17 @@
/* db response format
Query or GetMore: // see struct QueryResult
- int resultOptions = 0;
+ int resultFlags = 0;
int64 cursorID;
int startingFrom;
int nReturned; // 0=infinity
list of marshalled JSObjects;
*/
-/* the field 'resultOptions' above */
+/* the field 'resultFlags' above */
enum {
/* returned, with zero results, when getMore is called but the cursor id is not valid at the server. */
- ResultOption_CursorNotFound = 1
+ ResultFlag_CursorNotFound = 1
};
// grab struct QueryResult from:
@@ -90,7 +90,7 @@ QueryResult* getMore(const char *ns, int ntoreturn, long long cursorid);
// caller must free() returned QueryResult.
QueryResult* runQuery(Message&, const char *ns, int ntoskip, int ntoreturn,
JSObj j, auto_ptr< set<string> > fieldFilter,
- stringstream&);
+ stringstream&, int queryOptions);
void updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert, stringstream& ss);
diff --git a/grid/message.h b/grid/message.h
index 7a3235ae265..67ef1593948 100644
--- a/grid/message.h
+++ b/grid/message.h
@@ -91,6 +91,8 @@ struct MsgData {
int operation;
char _data[4];
+ int& dataAsInt() { return *((int *) _data); }
+
int dataLen(); // len without header
};
const int MsgDataHeaderSize = sizeof(MsgData) - 4;