summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2013-07-08 23:19:53 -0400
committerMathias Stearn <mathias@10gen.com>2013-07-10 17:43:58 -0400
commita51f2688fa05672d999c997170847a3ee29a223b (patch)
tree87ba82c0a3a0c0ef1ef8c8853b3e4aae775aacf0
parent0f43cfcbe063c1f4615ce362e751155218224bec (diff)
downloadmongo-a51f2688fa05672d999c997170847a3ee29a223b.tar.gz
SERVER-5932 Aggregation returning a cursor
related tickets: SERVER-8261 System for commands to return cursors SERVER-10165 aggregate() helper should return cursor
-rw-r--r--jstests/aggregation/bugs/server5932.js88
-rw-r--r--jstests/aggregation/extras/utils.js25
-rw-r--r--src/mongo/client/dbclientcursor.h3
-rw-r--r--src/mongo/db/clientcursor.cpp4
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp146
-rw-r--r--src/mongo/db/cursor.h6
-rw-r--r--src/mongo/db/ops/query.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_match.cpp2
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp13
-rw-r--r--src/mongo/db/pipeline/pipeline.h12
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp4
-rw-r--r--src/mongo/s/commands_public.cpp6
-rw-r--r--src/mongo/scripting/v8_db.cpp22
-rw-r--r--src/mongo/scripting/v8_db.h1
-rw-r--r--src/mongo/shell/collection.js38
-rw-r--r--src/mongo/shell/query.js39
16 files changed, 403 insertions, 16 deletions
diff --git a/jstests/aggregation/bugs/server5932.js b/jstests/aggregation/bugs/server5932.js
new file mode 100644
index 00000000000..4325197c111
--- /dev/null
+++ b/jstests/aggregation/bugs/server5932.js
@@ -0,0 +1,88 @@
+// server-5932 Cursor-based aggregation
+
+var t = db.server5932;
+t.drop();
+
+//
+// define helpers
+//
+
+// batch size is optional
+function buildAggCmd(pipeline, batchSize) {
+ return {
+ aggregate: t.getName(),
+ pipeline: pipeline,
+ cursor: (batchSize === undefined ? {} : {batchSize: batchSize}),
+ };
+}
+
+// batch size is optional
+function makeCursor(cmdOut, followupBatchSize) {
+ assert.commandWorked(cmdOut);
+ assert.neq(cmdOut.cursor.id, undefined);
+ assert(cmdOut.cursor.id instanceof NumberLong);
+ assert(cmdOut.cursor.firstBatch instanceof Array);
+ return new DBCommandCursor(db.getMongo(), cmdOut, followupBatchSize);
+}
+
+// both batch sizes are optional
+function aggCursor(pipeline, firstBatchSize, followupBatchSize) {
+ var cmdOut = db.runCommand(buildAggCmd(pipeline, firstBatchSize));
+ assert.commandWorked(cmdOut);
+
+ if (firstBatchSize !== undefined)
+ assert.lte(cmdOut.cursor.firstBatch.length, firstBatchSize);
+
+ return makeCursor(cmdOut, followupBatchSize);
+}
+
+//
+// insert data
+//
+
+var bigArray = [];
+for (var i = 0; i < 1000; i++)
+ bigArray.push(i);
+
+var bigStr = Array(1001).toString(); // 1000 bytes of ','
+
+for (var i = 0; i < 100; i++)
+ t.insert({_id: i, bigArray: bigArray, bigStr: bigStr});
+
+//
+// do testing
+//
+
+// successfully handles results > 16MB (bigArray.length * bytes in bigStr * t.count() == 100MB)
+var cursor = aggCursor([{$unwind:'$bigArray'}]); // default settings
+assert.eq(cursor.itcount(), bigArray.length * t.count());
+var cursor = aggCursor([{$unwind:'$bigArray'}], 0); // empty first batch
+assert.eq(cursor.itcount(), bigArray.length * t.count());
+var cursor = aggCursor([{$unwind:'$bigArray'}], 5, 5); // many small batches
+assert.eq(cursor.itcount(), bigArray.length * t.count());
+
+// empty result set results in cursor.id == 0 unless batchSize is 0;
+var res = t.runCommand(buildAggCmd([{$match: {noSuchField: {$exists:true}}}]));
+assert.eq(res.cursor.firstBatch, []);
+assert.eq(res.cursor.id, 0);
+var res = t.runCommand(buildAggCmd([{$match: {noSuchField: {$exists:true}}}], 0));
+assert.eq(res.cursor.firstBatch, []);
+assert.neq(res.cursor.id, 0);
+assert.eq(makeCursor(res).itcount(), 0);
+
+// parse errors are caught before first batch, regardless of size
+var res = t.runCommand(buildAggCmd([{$noSuchStage:1}], 0));
+assert.commandFailed(res);
+
+// data dependent errors can get ok:1 but fail in getMore if they don't fail in first batch
+var res = t.runCommand(buildAggCmd([{$project:{cantAddString: {$add:[1, '$bigStr']}}}], 1));
+assert.commandFailed(res);
+var res = t.runCommand(buildAggCmd([{$project:{cantAddString: {$add:[1, '$bigStr']}}}], 0));
+assert.commandWorked(res);
+assert.throws(function() { makeCursor(res).itcount(); });
+
+// error if collection dropped after first batch
+var cursor = aggCursor([{$unwind:'$bigArray'}]);
+t.drop();
+assert.throws(function() { cursor.itcount(); });
+// DON'T ADD NEW TEST TO THIS FILE AFTER THIS ONE (unless you reseed the data)
diff --git a/jstests/aggregation/extras/utils.js b/jstests/aggregation/extras/utils.js
index bea13b293dc..a925f47d96a 100644
--- a/jstests/aggregation/extras/utils.js
+++ b/jstests/aggregation/extras/utils.js
@@ -241,12 +241,35 @@ function assertErrorCode(coll, pipe, code, errmsg) {
pipe = [pipe];
}
+ // Test non-cursor
var res = coll.runCommand("aggregate", {pipeline: pipe});
-
if (res.ok || res.code != code)
printjson({pipeline: pipe, result: res});
/* assert failure with proper error code */
assert(!res.ok, errmsg || "failed in assertErrorCode");
assert.eq(res.code, code);
+
+ // Test with cursors
+ if (coll.getDB().isMaster().msg !== "isdbgrid") {
+ // agg cursors not supported sharded yet
+
+ var cmd = {pipeline: pipe};
+ // cmd.cursor = {};
+ cmd.cursor = {batchSize: 0};
+
+ var cursorRes = coll.runCommand("aggregate", cmd);
+ if (cursorRes.ok) {
+ var followupBatchSize = 0; // default
+ var cursor = new DBCommandCursor(coll.getMongo(), cursorRes, followupBatchSize);
+
+ var error = assert.throws(function(){cursor.itcount()}, [], "expected error: " + code);
+ if (!error.search(code)) {
+ assert(false, "expected error: " + code + " got: " + error);
+ }
+ }
+ else {
+ assert.eq(cursorRes.code, code);
+ }
+ }
}
diff --git a/src/mongo/client/dbclientcursor.h b/src/mongo/client/dbclientcursor.h
index 46ae49c46b1..785734dac63 100644
--- a/src/mongo/client/dbclientcursor.h
+++ b/src/mongo/client/dbclientcursor.h
@@ -129,6 +129,9 @@ namespace mongo {
return (resultFlags & flag) != 0;
}
+ /// Change batchSize after construction. Can change after requesting first batch.
+ void setBatchSize(int newBatchSize) { batchSize = newBatchSize; }
+
DBClientCursor( DBClientBase* client, const string &_ns, BSONObj _query, int _nToReturn,
int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) :
_client(client),
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 3d375b6984b..22afbd4fcea 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -115,7 +115,7 @@ namespace mongo {
ClientCursor *cc = i.current();
bool shouldDelete = false;
- if ( cc->_db == db ) {
+ if (cc->c()->shouldDestroyOnNSDeletion() && cc->_db == db) {
if (isDB) {
// already checked that db matched above
dassert( str::startsWith(cc->_ns.c_str(), ns) );
@@ -126,7 +126,7 @@ namespace mongo {
shouldDelete = true;
}
}
-
+
if ( shouldDelete ) {
i.deleteAndAdvance();
}
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 56df3369d24..f3dff041ac1 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -30,9 +30,127 @@
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/ops/query.h"
namespace mongo {
+ static bool isCursorCommand(BSONObj cmdObj) {
+ BSONElement cursorElem = cmdObj["cursor"];
+ if (cursorElem.eoo())
+ return false;
+
+ uassert(16954, "cursor field must be missing or an object",
+ cursorElem.type() == Object);
+
+ BSONObj cursor = cursorElem.embeddedObject();
+ BSONElement batchSizeElem = cursor["batchSize"];
+ if (batchSizeElem.eoo()) {
+ uassert(16955, "cursor object can't contain fields other than batchSize",
+ cursor.isEmpty());
+ }
+ else {
+ uassert(16956, "cursor.batchSize must be a number",
+ batchSizeElem.isNumber());
+
+ // This can change in the future, but for now all negatives are reserved.
+ uassert(16957, "Cursor batchSize must not be negative",
+ batchSizeElem.numberLong() >= 0);
+ }
+
+ return true;
+ }
+
+ static void handleCursorCommand(CursorId id, BSONObj& cmdObj, BSONObjBuilder& result) {
+ BSONElement batchSizeElem = cmdObj.getFieldDotted("cursor.batchSize");
+ const long long batchSize = batchSizeElem.isNumber()
+ ? batchSizeElem.numberLong()
+ : 101; // same as query
+
+ // Using limited cursor API that ignores many edge cases. Should be sufficient for commands.
+ ClientCursor::Pin pin(id);
+ ClientCursor* cursor = pin.c();
+
+ massert(16958, "Cursor shouldn't have been deleted",
+ cursor);
+
+ // Make sure this cursor won't disappear on us
+ fassert(16959, !cursor->c()->shouldDestroyOnNSDeletion());
+ fassert(16960, !cursor->c()->requiresLock());
+
+ try {
+ // can't use result BSONObjBuilder directly since it won't handle exceptions correctly.
+ BSONArrayBuilder resultsArray;
+ const int byteLimit = MaxBytesToReturnToClientAtOnce;
+ for (int objs = 0;
+ objs < batchSize && cursor->ok() && resultsArray.len() <= byteLimit;
+ objs++) {
+ // TODO may need special logic if cursor->current() would cause results to be > 16MB
+ resultsArray.append(cursor->current());
+ cursor->advance();
+ }
+
+ // The initial ok() on a cursor may be very expensive so we don't do it when batchSize
+ // is 0 since that indicates a desire for a fast return.
+ if (batchSize != 0 && !cursor->ok()) {
+ // There is no more data. Kill the cursor.
+ pin.release();
+ ClientCursor::erase(id);
+ id = 0;
+ }
+
+ BSONObjBuilder cursorObj(result.subobjStart("cursor"));
+ cursorObj.append("id", id);
+ cursorObj.append("ns", cursor->ns());
+ cursorObj.append("firstBatch", resultsArray.arr());
+ cursorObj.done();
+ }
+ catch (...) {
+ // Clean up cursor on way out of scope.
+ pin.release();
+ ClientCursor::erase(id);
+ throw;
+ }
+ }
+
+
+ class PipelineCursor : public Cursor {
+ public:
+ PipelineCursor(intrusive_ptr<Pipeline> pipeline)
+ : _pipeline(pipeline)
+ {}
+
+ // "core" cursor protocol
+ virtual bool ok() { return !iterator()->eof(); }
+ virtual bool advance() { return iterator()->advance(); }
+ virtual BSONObj current() {
+ BSONObjBuilder builder;
+ iterator()->getCurrent().toBson(&builder);
+ return builder.obj();
+ }
+
+ virtual bool requiresLock() { return false; }
+ virtual bool shouldDestroyOnNSDeletion() { return false; }
+
+ virtual Record* _current() { return NULL; }
+ virtual DiskLoc currLoc() { return DiskLoc(); }
+ virtual DiskLoc refLoc() { return DiskLoc(); }
+ virtual bool supportGetMore() { return true; }
+ virtual bool supportYields() { return false; } // has wrong semantics
+ virtual bool getsetdup(DiskLoc loc) { return false; } // we don't generate dups
+ virtual bool isMultiKey() const { return false; }
+ virtual bool modifiedKeys() const { return false; }
+ virtual string toString() { return "Aggregate_Cursor"; }
+
+ // These probably won't be needed once aggregation supports it's own explain.
+ virtual long long nscanned() { return 0; }
+ virtual void explainDetails( BSONObjBuilder& b ) { return; }
+ private:
+ const DocumentSource* iterator() const { return _pipeline->output(); }
+ DocumentSource* iterator() { return _pipeline->output(); }
+
+ intrusive_ptr<Pipeline> _pipeline;
+ };
+
class PipelineCommand :
public Command {
public:
@@ -90,7 +208,26 @@ namespace mongo {
// This does the mongod-specific stuff like creating a cursor
PipelineD::prepareCursorSource(pPipeline, nsToDatabase(ns), pCtx);
- return pPipeline->run(result, errmsg);
+ pPipeline->stitch();
+
+ if (isCursorCommand(cmdObj)) {
+ CursorId id;
+ {
+ // Set up cursor
+ Client::ReadContext ctx(ns);
+ shared_ptr<Cursor> cursor(new PipelineCursor(pPipeline));
+ // cc will be owned by cursor manager
+ ClientCursor* cc = new ClientCursor(0, cursor, ns, cmdObj.getOwned());
+ id = cc->cursorid();
+ }
+
+ handleCursorCommand(id, cmdObj, result);
+ }
+ else {
+ pPipeline->run(result);
+ }
+
+ return true;
}
private:
@@ -152,7 +289,8 @@ namespace mongo {
/* run the shard pipeline */
BSONObjBuilder shardResultBuilder;
string shardErrmsg;
- pShardPipeline->run(shardResultBuilder, shardErrmsg);
+ pShardPipeline->stitch();
+ pShardPipeline->run(shardResultBuilder);
BSONObj shardResult(shardResultBuilder.done());
/* pick out the shard result, and prepare to read it */
@@ -165,12 +303,14 @@ namespace mongo {
if ((strcmp(pFieldName, "result") == 0) ||
(strcmp(pFieldName, "serverPipeline") == 0)) {
pPipeline->addInitialSource(DocumentSourceBsonArray::create(&shardElement, pCtx));
+ pPipeline->stitch();
/*
Connect the output of the shard pipeline with the mongos
pipeline that will merge the results.
*/
- return pPipeline->run(result, errmsg);
+ pPipeline->run(result);
+ return true;
}
}
diff --git a/src/mongo/db/cursor.h b/src/mongo/db/cursor.h
index 9b1a312d1d6..5e71d3cd203 100644
--- a/src/mongo/db/cursor.h
+++ b/src/mongo/db/cursor.h
@@ -209,6 +209,12 @@ namespace mongo {
}
virtual void explainDetails( BSONObjBuilder& b ) { return; }
+
+ /// Should getmore handle locking for you
+ virtual bool requiresLock() { return true; }
+
+ /// Should this cursor be destroyed when it's namespace is deleted
+ virtual bool shouldDestroyOnNSDeletion() { return true; }
};
// strategy object implementing direction of traversal.
diff --git a/src/mongo/db/ops/query.cpp b/src/mongo/db/ops/query.cpp
index 0f23359983e..38381efcb98 100644
--- a/src/mongo/db/ops/query.cpp
+++ b/src/mongo/db/ops/query.cpp
@@ -107,7 +107,7 @@ namespace mongo {
int start = 0;
int n = 0;
- Client::ReadContext ctx(ns);
+ scoped_ptr<Client::ReadContext> ctx(new Client::ReadContext(ns));
// call this readlocked so state can't change
replVerifyReadsOk();
@@ -140,6 +140,14 @@ namespace mongo {
start = cc->pos();
Cursor *c = cc->c();
+
+ if (!c->requiresLock()) {
+ // make sure it won't be destroyed under us
+ fassert(16952, !c->shouldDestroyOnNSDeletion());
+ fassert(16953, !c->supportYields());
+ ctx.reset(); // unlocks
+ }
+
c->recoverFromYield();
DiskLoc last;
diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp
index e9944613b3f..920bb8ff369 100644
--- a/src/mongo/db/pipeline/document_source_match.cpp
+++ b/src/mongo/db/pipeline/document_source_match.cpp
@@ -98,6 +98,6 @@ namespace mongo {
const BSONObj &query,
const intrusive_ptr<ExpressionContext> &pExpCtx):
DocumentSourceFilterBase(pExpCtx),
- matcher(query) {
+ matcher(query.getOwned()) {
}
}
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index cadf730238e..2a5a9313f79 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -104,6 +104,11 @@ namespace mongo {
continue;
}
+ // ignore cursor options since they are handled externally.
+ if (str::equals(pFieldName, "cursor")) {
+ continue;
+ }
+
/* look for the aggregation command */
if (!strcmp(pFieldName, commandName)) {
pPipeline->collectionName = cmdElement.String();
@@ -392,7 +397,7 @@ namespace mongo {
}
}
- bool Pipeline::run(BSONObjBuilder &result, string &errmsg) {
+ void Pipeline::stitch() {
massert(16600, "should not have an empty pipeline",
!sources.empty());
@@ -406,7 +411,9 @@ namespace mongo {
pTemp->setSource(prevSource);
prevSource = pTemp.get();
}
+ }
+ void Pipeline::run(BSONObjBuilder& result) {
/*
Iterate through the resulting documents, and add them to the result.
We do this even if we're doing an explain, in order to capture
@@ -425,7 +432,7 @@ namespace mongo {
// cant use subArrayStart() due to error handling
BSONArrayBuilder resultArray;
DocumentSource* finalSource = sources.back().get();
- for(bool hasDoc = !finalSource->eof(); hasDoc; hasDoc = finalSource->advance()) {
+ for (bool hasDoc = !finalSource->eof(); hasDoc; hasDoc = finalSource->advance()) {
Document pDocument(finalSource->getCurrent());
/* add the document to the result set */
@@ -442,8 +449,6 @@ namespace mongo {
resultArray.done();
result.appendArray("result", resultArray.arr());
}
-
- return true;
}
void Pipeline::writeExplainOps(BSONArrayBuilder *pArrayBuilder) const {
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 58f2aeb8f04..74aad4f4980 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -94,14 +94,17 @@ namespace mongo {
*/
void toBson(BSONObjBuilder *pBuilder) const;
+ /** Stitch together the source pointers (by calling setSource) for each source in sources.
+ * Must be called after optimize and addInitialSource but before trying to get results.
+ */
+ void stitch();
+
/**
Run the Pipeline on the given source.
@param result builder to write the result to
- @param errmsg place to put error messages, if any
- @returns true on success, false if an error occurs
*/
- bool run(BSONObjBuilder &result, string &errmsg);
+ void run(BSONObjBuilder& result);
/**
Debugging: should the processing pipeline be split within
@@ -126,6 +129,9 @@ namespace mongo {
/// The initial source is special since it varies between mongos and mongod.
void addInitialSource(intrusive_ptr<DocumentSource> source);
+ /// The source that represents the output. Returns a non-owning pointer.
+ DocumentSource* output() { return sources.back().get(); }
+
/**
The aggregation command name.
*/
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 6693ca222d0..8be28311e8e 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -197,6 +197,10 @@ namespace mongo {
ClientCursor::Holder cursor(
new ClientCursor(QueryOption_NoCursorTimeout, pCursor, fullName));
CursorId cursorId = cursor->cursorid();
+ massert(16917, str::stream()
+ << "cursor " << cursor->c()->toString()
+ << "does its own locking so it can't be used with aggregation",
+ cursor->c()->requiresLock());
// Prepare the cursor for data to change under it when we unlock
if (cursor->c()->supportYields()) {
diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp
index fff3f7fdeea..964a9c59bb1 100644
--- a/src/mongo/s/commands_public.cpp
+++ b/src/mongo/s/commands_public.cpp
@@ -1800,6 +1800,9 @@ namespace mongo {
BSONObjBuilder &result, bool fromRepl) {
//const string shardedOutputCollection = getTmpName( collection );
+ uassert(16961, "Aggregation in a sharded system doesn't yet support cursors",
+ !cmdObj.hasField("cursor"));
+
intrusive_ptr<ExpressionContext> pExpCtx(
ExpressionContext::create(&InterruptStatusMongos::status));
pExpCtx->setInRouter(true);
@@ -1855,7 +1858,8 @@ namespace mongo {
pPipeline->addInitialSource(DocumentSourceCommandShards::create(shardResults, pExpCtx));
// Combine the shards' output and finish the pipeline
- pPipeline->run(result, errmsg);
+ pPipeline->stitch();
+ pPipeline->run(result);
if (errmsg.length() > 0)
return false;
diff --git a/src/mongo/scripting/v8_db.cpp b/src/mongo/scripting/v8_db.cpp
index e18e9f7d807..c2063ce3d05 100644
--- a/src/mongo/scripting/v8_db.cpp
+++ b/src/mongo/scripting/v8_db.cpp
@@ -100,6 +100,7 @@ namespace mongo {
scope->injectV8Method("update", mongoUpdate, proto);
scope->injectV8Method("auth", mongoAuth, proto);
scope->injectV8Method("logout", mongoLogout, proto);
+ scope->injectV8Method("cursorFromId", mongoCursorFromId, proto);
fassert(16468, _mongoPrototypeManipulatorsFrozen);
for (size_t i = 0; i < _mongoPrototypeManipulators.size(); ++i)
@@ -209,6 +210,27 @@ namespace mongo {
return c;
}
+ v8::Handle<v8::Value> mongoCursorFromId(V8Scope* scope, const v8::Arguments& args) {
+ argumentCheck(args.Length() == 2 || args.Length() == 3, "cursorFromId needs 2 or 3 args")
+ argumentCheck(scope->NumberLongFT()->HasInstance(args[1]), "2nd arg must be a NumberLong")
+ argumentCheck(args[2]->IsUndefined() || args[2]->IsNumber(), "3rd arg must be a js Number")
+
+ DBClientBase* conn = getConnection(scope, args);
+ const string ns = toSTLString(args[0]);
+ long long cursorId = numberLongVal(scope, args[1]->ToObject());
+
+ auto_ptr<mongo::DBClientCursor> cursor(new DBClientCursor(conn, ns, cursorId, 0, 0));
+
+ if (!args[2]->IsUndefined())
+ cursor->setBatchSize(args[2]->Int32Value());
+
+ v8::Handle<v8::Function> cons = scope->InternalCursorFT()->GetFunction();
+ v8::Persistent<v8::Object> c = v8::Persistent<v8::Object>::New(cons->NewInstance());
+ c->SetInternalField(0, v8::External::New(cursor.get()));
+ scope->dbClientCursorTracker.track(c, cursor.release());
+ return c;
+ }
+
v8::Handle<v8::Value> mongoInsert(V8Scope* scope, const v8::Arguments& args) {
argumentCheck(args.Length() == 3 ,"insert needs 3 args")
argumentCheck(args[1]->IsObject() ,"attempted to insert a non-object")
diff --git a/src/mongo/scripting/v8_db.h b/src/mongo/scripting/v8_db.h
index 445ab783681..7bb0f046c0b 100644
--- a/src/mongo/scripting/v8_db.h
+++ b/src/mongo/scripting/v8_db.h
@@ -46,6 +46,7 @@ namespace mongo {
v8::Handle<v8::Value> mongoUpdate(V8Scope* scope, const v8::Arguments& args);
v8::Handle<v8::Value> mongoAuth(V8Scope* scope, const v8::Arguments& args);
v8::Handle<v8::Value> mongoLogout(V8Scope* scope, const v8::Arguments& args);
+ v8::Handle<v8::Value> mongoCursorFromId(V8Scope* scope, const v8::Arguments& args);
// Cursor object
v8::Handle<v8::Value> internalCursorCons(V8Scope* scope, const v8::Arguments& args);
diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js
index 927b9dc7df6..64b41fa6d0a 100644
--- a/src/mongo/shell/collection.js
+++ b/src/mongo/shell/collection.js
@@ -886,6 +886,31 @@ DBCollection.prototype.distinct = function( keyString , query ){
}
+DBCollection.prototype.aggregateCursor = function(pipeline, extraOpts) {
+ // This function should replace aggregate() in SERVER-10165.
+
+ var cmd = {pipeline: pipeline};
+
+ if (!(pipeline instanceof Array)) {
+ // support varargs form
+ cmd.pipeline = [];
+ for (var i=0; i<arguments.length; i++) {
+ cmd.pipeline.push(arguments[i]);
+ }
+ }
+ else {
+ Object.extend(cmd, extraOpts);
+ }
+
+ if (cmd.cursor === undefined) {
+ cmd.cursor = {};
+ }
+
+ var cursorRes = this.runCommand("aggregate", cmd);
+ assert.commandWorked(cursorRes, "aggregate with cursor failed");
+ return new DBCommandCursor(this._mongo, cursorRes);
+}
+
DBCollection.prototype.aggregate = function( ops ) {
var arr = ops;
@@ -902,6 +927,19 @@ DBCollection.prototype.aggregate = function( ops ) {
printStackTrace();
throw "aggregate failed: " + tojson(res);
}
+
+ if (TestData) {
+ // If we are running in a test, make sure cursor output is the same.
+ // This block should go away with work on SERVER-10165.
+
+ if (this._db.isMaster().msg !== "isdbgrid") {
+ // agg cursors not supported sharded yet
+
+ var cursor = this.aggregateCursor(arr, {cursor: {batchSize: 0}});
+ assert.eq(cursor.toArray(), res.result);
+ }
+ }
+
return res;
}
diff --git a/src/mongo/shell/query.js b/src/mongo/shell/query.js
index b9acfd98549..0c0cbd27464 100644
--- a/src/mongo/shell/query.js
+++ b/src/mongo/shell/query.js
@@ -376,3 +376,42 @@ DBQuery.Option = {
partial: 0x80
};
+function DBCommandCursor(mongo, cmdResult, batchSize) {
+ assert.commandWorked(cmdResult)
+ this._firstBatch = cmdResult.cursor.firstBatch.reverse(); // modifies input to allow popping
+ this._cursor = mongo.cursorFromId(cmdResult.cursor.ns, cmdResult.cursor.id, batchSize);
+}
+
+DBCommandCursor.prototype = {};
+DBCommandCursor.prototype.hasNext = function() {
+ return this._firstBatch.length || this._cursor.hasNext();
+}
+DBCommandCursor.prototype.next = function() {
+ if (this._firstBatch.length) {
+ // $err wouldn't be in _firstBatch since ok was true.
+ return this._firstBatch.pop();
+ }
+ else {
+ var ret = this._cursor.next();
+ if ( ret.$err )
+ throw "error: " + tojson( ret );
+ return ret;
+ }
+}
+DBCommandCursor.prototype.objsLeftInBatch = function() {
+ if (this._firstBatch.length) {
+ return this._firstBatch.length;
+ }
+ else {
+ return this._cursor.objsLeftInBatch();
+ }
+}
+
+// Copy these methods from DBQuery
+DBCommandCursor.prototype.toArray = DBQuery.prototype.toArray
+DBCommandCursor.prototype.forEach = DBQuery.prototype.forEach
+DBCommandCursor.prototype.map = DBQuery.prototype.map
+DBCommandCursor.prototype.itcount = DBQuery.prototype.itcount
+DBCommandCursor.prototype.shellPrint = DBQuery.prototype.shellPrint
+DBCommandCursor.prototype.pretty = DBQuery.prototype.pretty
+