diff options
author | Mathias Stearn <mathias@10gen.com> | 2013-07-08 23:19:53 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2013-07-10 17:43:58 -0400 |
commit | a51f2688fa05672d999c997170847a3ee29a223b (patch) | |
tree | 87ba82c0a3a0c0ef1ef8c8853b3e4aae775aacf0 | |
parent | 0f43cfcbe063c1f4615ce362e751155218224bec (diff) | |
download | mongo-a51f2688fa05672d999c997170847a3ee29a223b.tar.gz |
SERVER-5932 Aggregation returning a cursor
related tickets:
SERVER-8261 System for commands to return cursors
SERVER-10165 aggregate() helper should return cursor
-rw-r--r-- | jstests/aggregation/bugs/server5932.js | 88 | ||||
-rw-r--r-- | jstests/aggregation/extras/utils.js | 25 | ||||
-rw-r--r-- | src/mongo/client/dbclientcursor.h | 3 | ||||
-rw-r--r-- | src/mongo/db/clientcursor.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 146 | ||||
-rw-r--r-- | src/mongo/db/cursor.h | 6 | ||||
-rw-r--r-- | src/mongo/db/ops/query.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_match.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands_public.cpp | 6 | ||||
-rw-r--r-- | src/mongo/scripting/v8_db.cpp | 22 | ||||
-rw-r--r-- | src/mongo/scripting/v8_db.h | 1 | ||||
-rw-r--r-- | src/mongo/shell/collection.js | 38 | ||||
-rw-r--r-- | src/mongo/shell/query.js | 39 |
16 files changed, 403 insertions, 16 deletions
diff --git a/jstests/aggregation/bugs/server5932.js b/jstests/aggregation/bugs/server5932.js new file mode 100644 index 00000000000..4325197c111 --- /dev/null +++ b/jstests/aggregation/bugs/server5932.js @@ -0,0 +1,88 @@ +// server-5932 Cursor-based aggregation + +var t = db.server5932; +t.drop(); + +// +// define helpers +// + +// batch size is optional +function buildAggCmd(pipeline, batchSize) { + return { + aggregate: t.getName(), + pipeline: pipeline, + cursor: (batchSize === undefined ? {} : {batchSize: batchSize}), + }; +} + +// batch size is optional +function makeCursor(cmdOut, followupBatchSize) { + assert.commandWorked(cmdOut); + assert.neq(cmdOut.cursor.id, undefined); + assert(cmdOut.cursor.id instanceof NumberLong); + assert(cmdOut.cursor.firstBatch instanceof Array); + return new DBCommandCursor(db.getMongo(), cmdOut, followupBatchSize); +} + +// both batch sizes are optional +function aggCursor(pipeline, firstBatchSize, followupBatchSize) { + var cmdOut = db.runCommand(buildAggCmd(pipeline, firstBatchSize)); + assert.commandWorked(cmdOut); + + if (firstBatchSize !== undefined) + assert.lte(cmdOut.cursor.firstBatch.length, firstBatchSize); + + return makeCursor(cmdOut, followupBatchSize); +} + +// +// insert data +// + +var bigArray = []; +for (var i = 0; i < 1000; i++) + bigArray.push(i); + +var bigStr = Array(1001).toString(); // 1000 bytes of ',' + +for (var i = 0; i < 100; i++) + t.insert({_id: i, bigArray: bigArray, bigStr: bigStr}); + +// +// do testing +// + +// successfully handles results > 16MB (bigArray.length * bytes in bigStr * t.count() == 100MB) +var cursor = aggCursor([{$unwind:'$bigArray'}]); // default settings +assert.eq(cursor.itcount(), bigArray.length * t.count()); +var cursor = aggCursor([{$unwind:'$bigArray'}], 0); // empty first batch +assert.eq(cursor.itcount(), bigArray.length * t.count()); +var cursor = aggCursor([{$unwind:'$bigArray'}], 5, 5); // many small batches +assert.eq(cursor.itcount(), bigArray.length * t.count()); + +// empty result set results in cursor.id == 0 unless batchSize is 0; +var res = t.runCommand(buildAggCmd([{$match: {noSuchField: {$exists:true}}}])); +assert.eq(res.cursor.firstBatch, []); +assert.eq(res.cursor.id, 0); +var res = t.runCommand(buildAggCmd([{$match: {noSuchField: {$exists:true}}}], 0)); +assert.eq(res.cursor.firstBatch, []); +assert.neq(res.cursor.id, 0); +assert.eq(makeCursor(res).itcount(), 0); + +// parse errors are caught before first batch, regardless of size +var res = t.runCommand(buildAggCmd([{$noSuchStage:1}], 0)); +assert.commandFailed(res); + +// data dependent errors can get ok:1 but fail in getMore if they don't fail in first batch +var res = t.runCommand(buildAggCmd([{$project:{cantAddString: {$add:[1, '$bigStr']}}}], 1)); +assert.commandFailed(res); +var res = t.runCommand(buildAggCmd([{$project:{cantAddString: {$add:[1, '$bigStr']}}}], 0)); +assert.commandWorked(res); +assert.throws(function() { makeCursor(res).itcount(); }); + +// error if collection dropped after first batch +var cursor = aggCursor([{$unwind:'$bigArray'}]); +t.drop(); +assert.throws(function() { cursor.itcount(); }); +// DON'T ADD NEW TEST TO THIS FILE AFTER THIS ONE (unless you reseed the data) diff --git a/jstests/aggregation/extras/utils.js b/jstests/aggregation/extras/utils.js index bea13b293dc..a925f47d96a 100644 --- a/jstests/aggregation/extras/utils.js +++ b/jstests/aggregation/extras/utils.js @@ -241,12 +241,35 @@ function assertErrorCode(coll, pipe, code, errmsg) { pipe = [pipe]; } + // Test non-cursor var res = coll.runCommand("aggregate", {pipeline: pipe}); - if (res.ok || res.code != code) printjson({pipeline: pipe, result: res}); /* assert failure with proper error code */ assert(!res.ok, errmsg || "failed in assertErrorCode"); assert.eq(res.code, code); + + // Test with cursors + if (coll.getDB().isMaster().msg !== "isdbgrid") { + // agg cursors not supported sharded yet + + var cmd = {pipeline: pipe}; + // cmd.cursor = {}; + cmd.cursor = {batchSize: 0}; + + var cursorRes = coll.runCommand("aggregate", cmd); + if (cursorRes.ok) { + var followupBatchSize = 0; // default + var cursor = new DBCommandCursor(coll.getMongo(), cursorRes, followupBatchSize); + + var error = assert.throws(function(){cursor.itcount()}, [], "expected error: " + code); + if (!error.search(code)) { + assert(false, "expected error: " + code + " got: " + error); + } + } + else { + assert.eq(cursorRes.code, code); + } + } } diff --git a/src/mongo/client/dbclientcursor.h b/src/mongo/client/dbclientcursor.h index 46ae49c46b1..785734dac63 100644 --- a/src/mongo/client/dbclientcursor.h +++ b/src/mongo/client/dbclientcursor.h @@ -129,6 +129,9 @@ namespace mongo { return (resultFlags & flag) != 0; } + /// Change batchSize after construction. Can change after requesting first batch. + void setBatchSize(int newBatchSize) { batchSize = newBatchSize; } + DBClientCursor( DBClientBase* client, const string &_ns, BSONObj _query, int _nToReturn, int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) : _client(client), diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 3d375b6984b..22afbd4fcea 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -115,7 +115,7 @@ namespace mongo { ClientCursor *cc = i.current(); bool shouldDelete = false; - if ( cc->_db == db ) { + if (cc->c()->shouldDestroyOnNSDeletion() && cc->_db == db) { if (isDB) { // already checked that db matched above dassert( str::startsWith(cc->_ns.c_str(), ns) ); @@ -126,7 +126,7 @@ namespace mongo { shouldDelete = true; } } - + if ( shouldDelete ) { i.deleteAndAdvance(); } diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 56df3369d24..f3dff041ac1 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -30,9 +30,127 @@ #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/ops/query.h" namespace mongo { + static bool isCursorCommand(BSONObj cmdObj) { + BSONElement cursorElem = cmdObj["cursor"]; + if (cursorElem.eoo()) + return false; + + uassert(16954, "cursor field must be missing or an object", + cursorElem.type() == Object); + + BSONObj cursor = cursorElem.embeddedObject(); + BSONElement batchSizeElem = cursor["batchSize"]; + if (batchSizeElem.eoo()) { + uassert(16955, "cursor object can't contain fields other than batchSize", + cursor.isEmpty()); + } + else { + uassert(16956, "cursor.batchSize must be a number", + batchSizeElem.isNumber()); + + // This can change in the future, but for now all negatives are reserved. + uassert(16957, "Cursor batchSize must not be negative", + batchSizeElem.numberLong() >= 0); + } + + return true; + } + + static void handleCursorCommand(CursorId id, BSONObj& cmdObj, BSONObjBuilder& result) { + BSONElement batchSizeElem = cmdObj.getFieldDotted("cursor.batchSize"); + const long long batchSize = batchSizeElem.isNumber() + ? batchSizeElem.numberLong() + : 101; // same as query + + // Using limited cursor API that ignores many edge cases. Should be sufficient for commands. + ClientCursor::Pin pin(id); + ClientCursor* cursor = pin.c(); + + massert(16958, "Cursor shouldn't have been deleted", + cursor); + + // Make sure this cursor won't disappear on us + fassert(16959, !cursor->c()->shouldDestroyOnNSDeletion()); + fassert(16960, !cursor->c()->requiresLock()); + + try { + // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. + BSONArrayBuilder resultsArray; + const int byteLimit = MaxBytesToReturnToClientAtOnce; + for (int objs = 0; + objs < batchSize && cursor->ok() && resultsArray.len() <= byteLimit; + objs++) { + // TODO may need special logic if cursor->current() would cause results to be > 16MB + resultsArray.append(cursor->current()); + cursor->advance(); + } + + // The initial ok() on a cursor may be very expensive so we don't do it when batchSize + // is 0 since that indicates a desire for a fast return. + if (batchSize != 0 && !cursor->ok()) { + // There is no more data. Kill the cursor. + pin.release(); + ClientCursor::erase(id); + id = 0; + } + + BSONObjBuilder cursorObj(result.subobjStart("cursor")); + cursorObj.append("id", id); + cursorObj.append("ns", cursor->ns()); + cursorObj.append("firstBatch", resultsArray.arr()); + cursorObj.done(); + } + catch (...) { + // Clean up cursor on way out of scope. + pin.release(); + ClientCursor::erase(id); + throw; + } + } + + + class PipelineCursor : public Cursor { + public: + PipelineCursor(intrusive_ptr<Pipeline> pipeline) + : _pipeline(pipeline) + {} + + // "core" cursor protocol + virtual bool ok() { return !iterator()->eof(); } + virtual bool advance() { return iterator()->advance(); } + virtual BSONObj current() { + BSONObjBuilder builder; + iterator()->getCurrent().toBson(&builder); + return builder.obj(); + } + + virtual bool requiresLock() { return false; } + virtual bool shouldDestroyOnNSDeletion() { return false; } + + virtual Record* _current() { return NULL; } + virtual DiskLoc currLoc() { return DiskLoc(); } + virtual DiskLoc refLoc() { return DiskLoc(); } + virtual bool supportGetMore() { return true; } + virtual bool supportYields() { return false; } // has wrong semantics + virtual bool getsetdup(DiskLoc loc) { return false; } // we don't generate dups + virtual bool isMultiKey() const { return false; } + virtual bool modifiedKeys() const { return false; } + virtual string toString() { return "Aggregate_Cursor"; } + + // These probably won't be needed once aggregation supports it's own explain. + virtual long long nscanned() { return 0; } + virtual void explainDetails( BSONObjBuilder& b ) { return; } + private: + const DocumentSource* iterator() const { return _pipeline->output(); } + DocumentSource* iterator() { return _pipeline->output(); } + + intrusive_ptr<Pipeline> _pipeline; + }; + class PipelineCommand : public Command { public: @@ -90,7 +208,26 @@ namespace mongo { // This does the mongod-specific stuff like creating a cursor PipelineD::prepareCursorSource(pPipeline, nsToDatabase(ns), pCtx); - return pPipeline->run(result, errmsg); + pPipeline->stitch(); + + if (isCursorCommand(cmdObj)) { + CursorId id; + { + // Set up cursor + Client::ReadContext ctx(ns); + shared_ptr<Cursor> cursor(new PipelineCursor(pPipeline)); + // cc will be owned by cursor manager + ClientCursor* cc = new ClientCursor(0, cursor, ns, cmdObj.getOwned()); + id = cc->cursorid(); + } + + handleCursorCommand(id, cmdObj, result); + } + else { + pPipeline->run(result); + } + + return true; } private: @@ -152,7 +289,8 @@ namespace mongo { /* run the shard pipeline */ BSONObjBuilder shardResultBuilder; string shardErrmsg; - pShardPipeline->run(shardResultBuilder, shardErrmsg); + pShardPipeline->stitch(); + pShardPipeline->run(shardResultBuilder); BSONObj shardResult(shardResultBuilder.done()); /* pick out the shard result, and prepare to read it */ @@ -165,12 +303,14 @@ namespace mongo { if ((strcmp(pFieldName, "result") == 0) || (strcmp(pFieldName, "serverPipeline") == 0)) { pPipeline->addInitialSource(DocumentSourceBsonArray::create(&shardElement, pCtx)); + pPipeline->stitch(); /* Connect the output of the shard pipeline with the mongos pipeline that will merge the results. */ - return pPipeline->run(result, errmsg); + pPipeline->run(result); + return true; } } diff --git a/src/mongo/db/cursor.h b/src/mongo/db/cursor.h index 9b1a312d1d6..5e71d3cd203 100644 --- a/src/mongo/db/cursor.h +++ b/src/mongo/db/cursor.h @@ -209,6 +209,12 @@ namespace mongo { } virtual void explainDetails( BSONObjBuilder& b ) { return; } + + /// Should getmore handle locking for you + virtual bool requiresLock() { return true; } + + /// Should this cursor be destroyed when it's namespace is deleted + virtual bool shouldDestroyOnNSDeletion() { return true; } }; // strategy object implementing direction of traversal. diff --git a/src/mongo/db/ops/query.cpp b/src/mongo/db/ops/query.cpp index 0f23359983e..38381efcb98 100644 --- a/src/mongo/db/ops/query.cpp +++ b/src/mongo/db/ops/query.cpp @@ -107,7 +107,7 @@ namespace mongo { int start = 0; int n = 0; - Client::ReadContext ctx(ns); + scoped_ptr<Client::ReadContext> ctx(new Client::ReadContext(ns)); // call this readlocked so state can't change replVerifyReadsOk(); @@ -140,6 +140,14 @@ namespace mongo { start = cc->pos(); Cursor *c = cc->c(); + + if (!c->requiresLock()) { + // make sure it won't be destroyed under us + fassert(16952, !c->shouldDestroyOnNSDeletion()); + fassert(16953, !c->supportYields()); + ctx.reset(); // unlocks + } + c->recoverFromYield(); DiskLoc last; diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp index e9944613b3f..920bb8ff369 100644 --- a/src/mongo/db/pipeline/document_source_match.cpp +++ b/src/mongo/db/pipeline/document_source_match.cpp @@ -98,6 +98,6 @@ namespace mongo { const BSONObj &query, const intrusive_ptr<ExpressionContext> &pExpCtx): DocumentSourceFilterBase(pExpCtx), - matcher(query) { + matcher(query.getOwned()) { } } diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index cadf730238e..2a5a9313f79 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -104,6 +104,11 @@ namespace mongo { continue; } + // ignore cursor options since they are handled externally. + if (str::equals(pFieldName, "cursor")) { + continue; + } + /* look for the aggregation command */ if (!strcmp(pFieldName, commandName)) { pPipeline->collectionName = cmdElement.String(); @@ -392,7 +397,7 @@ namespace mongo { } } - bool Pipeline::run(BSONObjBuilder &result, string &errmsg) { + void Pipeline::stitch() { massert(16600, "should not have an empty pipeline", !sources.empty()); @@ -406,7 +411,9 @@ namespace mongo { pTemp->setSource(prevSource); prevSource = pTemp.get(); } + } + void Pipeline::run(BSONObjBuilder& result) { /* Iterate through the resulting documents, and add them to the result. We do this even if we're doing an explain, in order to capture @@ -425,7 +432,7 @@ namespace mongo { // cant use subArrayStart() due to error handling BSONArrayBuilder resultArray; DocumentSource* finalSource = sources.back().get(); - for(bool hasDoc = !finalSource->eof(); hasDoc; hasDoc = finalSource->advance()) { + for (bool hasDoc = !finalSource->eof(); hasDoc; hasDoc = finalSource->advance()) { Document pDocument(finalSource->getCurrent()); /* add the document to the result set */ @@ -442,8 +449,6 @@ namespace mongo { resultArray.done(); result.appendArray("result", resultArray.arr()); } - - return true; } void Pipeline::writeExplainOps(BSONArrayBuilder *pArrayBuilder) const { diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 58f2aeb8f04..74aad4f4980 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -94,14 +94,17 @@ namespace mongo { */ void toBson(BSONObjBuilder *pBuilder) const; + /** Stitch together the source pointers (by calling setSource) for each source in sources. + * Must be called after optimize and addInitialSource but before trying to get results. + */ + void stitch(); + /** Run the Pipeline on the given source. @param result builder to write the result to - @param errmsg place to put error messages, if any - @returns true on success, false if an error occurs */ - bool run(BSONObjBuilder &result, string &errmsg); + void run(BSONObjBuilder& result); /** Debugging: should the processing pipeline be split within @@ -126,6 +129,9 @@ namespace mongo { /// The initial source is special since it varies between mongos and mongod. void addInitialSource(intrusive_ptr<DocumentSource> source); + /// The source that represents the output. Returns a non-owning pointer. + DocumentSource* output() { return sources.back().get(); } + /** The aggregation command name. */ diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 6693ca222d0..8be28311e8e 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -197,6 +197,10 @@ namespace mongo { ClientCursor::Holder cursor( new ClientCursor(QueryOption_NoCursorTimeout, pCursor, fullName)); CursorId cursorId = cursor->cursorid(); + massert(16917, str::stream() + << "cursor " << cursor->c()->toString() + << "does its own locking so it can't be used with aggregation", + cursor->c()->requiresLock()); // Prepare the cursor for data to change under it when we unlock if (cursor->c()->supportYields()) { diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp index fff3f7fdeea..964a9c59bb1 100644 --- a/src/mongo/s/commands_public.cpp +++ b/src/mongo/s/commands_public.cpp @@ -1800,6 +1800,9 @@ namespace mongo { BSONObjBuilder &result, bool fromRepl) { //const string shardedOutputCollection = getTmpName( collection ); + uassert(16961, "Aggregation in a sharded system doesn't yet support cursors", + !cmdObj.hasField("cursor")); + intrusive_ptr<ExpressionContext> pExpCtx( ExpressionContext::create(&InterruptStatusMongos::status)); pExpCtx->setInRouter(true); @@ -1855,7 +1858,8 @@ namespace mongo { pPipeline->addInitialSource(DocumentSourceCommandShards::create(shardResults, pExpCtx)); // Combine the shards' output and finish the pipeline - pPipeline->run(result, errmsg); + pPipeline->stitch(); + pPipeline->run(result); if (errmsg.length() > 0) return false; diff --git a/src/mongo/scripting/v8_db.cpp b/src/mongo/scripting/v8_db.cpp index e18e9f7d807..c2063ce3d05 100644 --- a/src/mongo/scripting/v8_db.cpp +++ b/src/mongo/scripting/v8_db.cpp @@ -100,6 +100,7 @@ namespace mongo { scope->injectV8Method("update", mongoUpdate, proto); scope->injectV8Method("auth", mongoAuth, proto); scope->injectV8Method("logout", mongoLogout, proto); + scope->injectV8Method("cursorFromId", mongoCursorFromId, proto); fassert(16468, _mongoPrototypeManipulatorsFrozen); for (size_t i = 0; i < _mongoPrototypeManipulators.size(); ++i) @@ -209,6 +210,27 @@ namespace mongo { return c; } + v8::Handle<v8::Value> mongoCursorFromId(V8Scope* scope, const v8::Arguments& args) { + argumentCheck(args.Length() == 2 || args.Length() == 3, "cursorFromId needs 2 or 3 args") + argumentCheck(scope->NumberLongFT()->HasInstance(args[1]), "2nd arg must be a NumberLong") + argumentCheck(args[2]->IsUndefined() || args[2]->IsNumber(), "3rd arg must be a js Number") + + DBClientBase* conn = getConnection(scope, args); + const string ns = toSTLString(args[0]); + long long cursorId = numberLongVal(scope, args[1]->ToObject()); + + auto_ptr<mongo::DBClientCursor> cursor(new DBClientCursor(conn, ns, cursorId, 0, 0)); + + if (!args[2]->IsUndefined()) + cursor->setBatchSize(args[2]->Int32Value()); + + v8::Handle<v8::Function> cons = scope->InternalCursorFT()->GetFunction(); + v8::Persistent<v8::Object> c = v8::Persistent<v8::Object>::New(cons->NewInstance()); + c->SetInternalField(0, v8::External::New(cursor.get())); + scope->dbClientCursorTracker.track(c, cursor.release()); + return c; + } + v8::Handle<v8::Value> mongoInsert(V8Scope* scope, const v8::Arguments& args) { argumentCheck(args.Length() == 3 ,"insert needs 3 args") argumentCheck(args[1]->IsObject() ,"attempted to insert a non-object") diff --git a/src/mongo/scripting/v8_db.h b/src/mongo/scripting/v8_db.h index 445ab783681..7bb0f046c0b 100644 --- a/src/mongo/scripting/v8_db.h +++ b/src/mongo/scripting/v8_db.h @@ -46,6 +46,7 @@ namespace mongo { v8::Handle<v8::Value> mongoUpdate(V8Scope* scope, const v8::Arguments& args); v8::Handle<v8::Value> mongoAuth(V8Scope* scope, const v8::Arguments& args); v8::Handle<v8::Value> mongoLogout(V8Scope* scope, const v8::Arguments& args); + v8::Handle<v8::Value> mongoCursorFromId(V8Scope* scope, const v8::Arguments& args); // Cursor object v8::Handle<v8::Value> internalCursorCons(V8Scope* scope, const v8::Arguments& args); diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js index 927b9dc7df6..64b41fa6d0a 100644 --- a/src/mongo/shell/collection.js +++ b/src/mongo/shell/collection.js @@ -886,6 +886,31 @@ DBCollection.prototype.distinct = function( keyString , query ){ } +DBCollection.prototype.aggregateCursor = function(pipeline, extraOpts) { + // This function should replace aggregate() in SERVER-10165. + + var cmd = {pipeline: pipeline}; + + if (!(pipeline instanceof Array)) { + // support varargs form + cmd.pipeline = []; + for (var i=0; i<arguments.length; i++) { + cmd.pipeline.push(arguments[i]); + } + } + else { + Object.extend(cmd, extraOpts); + } + + if (cmd.cursor === undefined) { + cmd.cursor = {}; + } + + var cursorRes = this.runCommand("aggregate", cmd); + assert.commandWorked(cursorRes, "aggregate with cursor failed"); + return new DBCommandCursor(this._mongo, cursorRes); +} + DBCollection.prototype.aggregate = function( ops ) { var arr = ops; @@ -902,6 +927,19 @@ DBCollection.prototype.aggregate = function( ops ) { printStackTrace(); throw "aggregate failed: " + tojson(res); } + + if (TestData) { + // If we are running in a test, make sure cursor output is the same. + // This block should go away with work on SERVER-10165. + + if (this._db.isMaster().msg !== "isdbgrid") { + // agg cursors not supported sharded yet + + var cursor = this.aggregateCursor(arr, {cursor: {batchSize: 0}}); + assert.eq(cursor.toArray(), res.result); + } + } + return res; } diff --git a/src/mongo/shell/query.js b/src/mongo/shell/query.js index b9acfd98549..0c0cbd27464 100644 --- a/src/mongo/shell/query.js +++ b/src/mongo/shell/query.js @@ -376,3 +376,42 @@ DBQuery.Option = { partial: 0x80 }; +function DBCommandCursor(mongo, cmdResult, batchSize) { + assert.commandWorked(cmdResult) + this._firstBatch = cmdResult.cursor.firstBatch.reverse(); // modifies input to allow popping + this._cursor = mongo.cursorFromId(cmdResult.cursor.ns, cmdResult.cursor.id, batchSize); +} + +DBCommandCursor.prototype = {}; +DBCommandCursor.prototype.hasNext = function() { + return this._firstBatch.length || this._cursor.hasNext(); +} +DBCommandCursor.prototype.next = function() { + if (this._firstBatch.length) { + // $err wouldn't be in _firstBatch since ok was true. + return this._firstBatch.pop(); + } + else { + var ret = this._cursor.next(); + if ( ret.$err ) + throw "error: " + tojson( ret ); + return ret; + } +} +DBCommandCursor.prototype.objsLeftInBatch = function() { + if (this._firstBatch.length) { + return this._firstBatch.length; + } + else { + return this._cursor.objsLeftInBatch(); + } +} + +// Copy these methods from DBQuery +DBCommandCursor.prototype.toArray = DBQuery.prototype.toArray +DBCommandCursor.prototype.forEach = DBQuery.prototype.forEach +DBCommandCursor.prototype.map = DBQuery.prototype.map +DBCommandCursor.prototype.itcount = DBQuery.prototype.itcount +DBCommandCursor.prototype.shellPrint = DBQuery.prototype.shellPrint +DBCommandCursor.prototype.pretty = DBQuery.prototype.pretty + |