From 7349ba70a0e68627dc322113c561afe3a9ed37a1 Mon Sep 17 00:00:00 2001 From: Eliot Horowitz Date: Fri, 24 Jan 2014 15:47:07 -0500 Subject: SERVER-12392: Move cursor/runner cache into Collection lifecycle via CollectionCursorCache --- src/mongo/db/commands/pipeline_command.cpp | 114 +++++++++++++++++++---------- 1 file changed, 75 insertions(+), 39 deletions(-) (limited to 'src/mongo/db/commands/pipeline_command.cpp') diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index f249e2f8174..fa3a083a53d 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -33,6 +33,7 @@ #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/privilege.h" +#include "mongo/db/catalog/database.h" #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/commands.h" @@ -102,9 +103,12 @@ namespace { // These are all no-ops for PipelineRunners virtual void setYieldPolicy(YieldPolicy policy) {} virtual void invalidate(const DiskLoc& dl, InvalidationType type) {} - virtual void kill() {} + virtual void kill() { + _pipeline->output()->kill(); + } virtual void saveState() {} virtual bool restoreState() { return true; } + virtual const Collection* collection() { return NULL; } /** * Make obj the next object returned by getNext(). @@ -160,45 +164,80 @@ namespace { return true; } - static void handleCursorCommand(CursorId id, BSONObj& cmdObj, BSONObjBuilder& result) { + static void handleCursorCommand(const string& ns, + intrusive_ptr& pPipeline, + BSONObj& cmdObj, + BSONObjBuilder& result) { + + scoped_ptr pin; + string cursorNs = ns; + + { + // Set up cursor + Client::ReadContext ctx(ns); + Collection* collection = ctx.ctx().db()->getCollection( ns ); + if ( collection ) { + ClientCursor* cc = new ClientCursor(collection, + new PipelineRunner(pPipeline)); + // enable special locking and ns deletion behavior + cc->isAggCursor = true; + + pin.reset( new ClientCursorPin( collection, cc->cursorid() ) ); + + // we need this after cursor may have been deleted + cursorNs = cc->ns(); + } + } + BSONElement batchSizeElem = cmdObj.getFieldDotted("cursor.batchSize"); const long long batchSize = batchSizeElem.isNumber() ? batchSizeElem.numberLong() : 101; // same as query - ClientCursorPin pin(id); - ClientCursor* cursor = pin.c(); - - massert(16958, "Cursor shouldn't have been deleted", - cursor); + ClientCursor* cursor = NULL; + PipelineRunner* runner = NULL; + if ( pin ) { + cursor = pin->c(); + massert(16958, + "Cursor shouldn't have been deleted", + cursor); + verify(cursor->isAggCursor); + + runner = dynamic_cast(cursor->getRunner()); + verify(runner); + } - verify(cursor->isAggCursor); - PipelineRunner* runner = dynamic_cast(cursor->getRunner()); - verify(runner); try { - const string cursorNs = cursor->ns(); // we need this after cursor may have been deleted // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. BSONArrayBuilder resultsArray; const int byteLimit = MaxBytesToReturnToClientAtOnce; BSONObj next; - for (int objCount = 0; objCount < batchSize; objCount++) { - // The initial getNext() on a PipelineRunner may be very expensive so we don't do it - // when batchSize is 0 since that indicates a desire for a fast return. - if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { - pin.deleteUnderlying(); - id = 0; - cursor = NULL; // make it an obvious error to use cursor after this point - break; + if ( runner ) { + for (int objCount = 0; objCount < batchSize; objCount++) { + // The initial getNext() on a PipelineRunner may be very expensive so we don't + // do it when batchSize is 0 since that indicates a desire for a fast return. + if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { + pin->deleteUnderlying(); + cursor = NULL; // make it an obvious error to use cursor after this point + break; + } + + if (resultsArray.len() + next.objsize() > byteLimit) { + // too big. next will be the first doc in the second batch + runner->pushBack(next); + break; + } + + resultsArray.append(next); } - - if (resultsArray.len() + next.objsize() > byteLimit) { - // too big. next will be the first doc in the second batch - runner->pushBack(next); - break; - } - - resultsArray.append(next); + } + else { + // this is to ensure that side-effects such as $out occur, + // and that an empty output set is the correct result of this pipeline + invariant( pPipeline.get() ); + invariant( pPipeline->output() ); + invariant( !pPipeline->output()->getNext() ); } if (cursor) { @@ -208,14 +247,19 @@ namespace { } BSONObjBuilder cursorObj(result.subobjStart("cursor")); - cursorObj.append("id", id); + if ( cursor ) + cursorObj.append("id", cursor->cursorid() ); + else + cursorObj.append("id", 0LL ); cursorObj.append("ns", cursorNs); cursorObj.append("firstBatch", resultsArray.arr()); cursorObj.done(); } catch (...) { // Clean up cursor on way out of scope. - pin.deleteUnderlying(); + if ( pin ) { + pin->deleteUnderlying(); + } throw; } } @@ -278,6 +322,7 @@ namespace { // This does the mongod-specific stuff like creating a cursor PipelineD::prepareCursorSource(pPipeline, pCtx); + pPipeline->stitch(); if (pPipeline->isExplain()) { @@ -286,16 +331,7 @@ namespace { } if (isCursorCommand(cmdObj)) { - CursorId id; - { - // Set up cursor - Client::ReadContext ctx(ns); - ClientCursor* cc = new ClientCursor(new PipelineRunner(pPipeline)); - cc->isAggCursor = true; // enable special locking and ns deletion behavior - id = cc->cursorid(); - } - - handleCursorCommand(id, cmdObj, result); + handleCursorCommand(ns, pPipeline, cmdObj, result); } else { pPipeline->run(result); -- cgit v1.2.1