summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/pipeline_command.cpp
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2014-01-24 15:47:07 -0500
committerEliot Horowitz <eliot@10gen.com>2014-01-24 15:47:07 -0500
commit7349ba70a0e68627dc322113c561afe3a9ed37a1 (patch)
treeafe597cf004f191288999d8efad785b42833809d /src/mongo/db/commands/pipeline_command.cpp
parented58b0dfe564253067b4cab11ab75477b7e48388 (diff)
downloadmongo-7349ba70a0e68627dc322113c561afe3a9ed37a1.tar.gz
SERVER-12392: Move cursor/runner cache into Collection lifecycle via CollectionCursorCache
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp114
1 files changed, 75 insertions, 39 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index f249e2f8174..fa3a083a53d 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/privilege.h"
+#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/commands.h"
@@ -102,9 +103,12 @@ namespace {
// These are all no-ops for PipelineRunners
virtual void setYieldPolicy(YieldPolicy policy) {}
virtual void invalidate(const DiskLoc& dl, InvalidationType type) {}
- virtual void kill() {}
+ virtual void kill() {
+ _pipeline->output()->kill();
+ }
virtual void saveState() {}
virtual bool restoreState() { return true; }
+ virtual const Collection* collection() { return NULL; }
/**
* Make obj the next object returned by getNext().
@@ -160,45 +164,80 @@ namespace {
return true;
}
- static void handleCursorCommand(CursorId id, BSONObj& cmdObj, BSONObjBuilder& result) {
+ static void handleCursorCommand(const string& ns,
+ intrusive_ptr<Pipeline>& pPipeline,
+ BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+
+ scoped_ptr<ClientCursorPin> pin;
+ string cursorNs = ns;
+
+ {
+ // Set up cursor
+ Client::ReadContext ctx(ns);
+ Collection* collection = ctx.ctx().db()->getCollection( ns );
+ if ( collection ) {
+ ClientCursor* cc = new ClientCursor(collection,
+ new PipelineRunner(pPipeline));
+ // enable special locking and ns deletion behavior
+ cc->isAggCursor = true;
+
+ pin.reset( new ClientCursorPin( collection, cc->cursorid() ) );
+
+ // we need this after cursor may have been deleted
+ cursorNs = cc->ns();
+ }
+ }
+
BSONElement batchSizeElem = cmdObj.getFieldDotted("cursor.batchSize");
const long long batchSize = batchSizeElem.isNumber()
? batchSizeElem.numberLong()
: 101; // same as query
- ClientCursorPin pin(id);
- ClientCursor* cursor = pin.c();
-
- massert(16958, "Cursor shouldn't have been deleted",
- cursor);
+ ClientCursor* cursor = NULL;
+ PipelineRunner* runner = NULL;
+ if ( pin ) {
+ cursor = pin->c();
+ massert(16958,
+ "Cursor shouldn't have been deleted",
+ cursor);
+ verify(cursor->isAggCursor);
+
+ runner = dynamic_cast<PipelineRunner*>(cursor->getRunner());
+ verify(runner);
+ }
- verify(cursor->isAggCursor);
- PipelineRunner* runner = dynamic_cast<PipelineRunner*>(cursor->getRunner());
- verify(runner);
try {
- const string cursorNs = cursor->ns(); // we need this after cursor may have been deleted
// can't use result BSONObjBuilder directly since it won't handle exceptions correctly.
BSONArrayBuilder resultsArray;
const int byteLimit = MaxBytesToReturnToClientAtOnce;
BSONObj next;
- for (int objCount = 0; objCount < batchSize; objCount++) {
- // The initial getNext() on a PipelineRunner may be very expensive so we don't do it
- // when batchSize is 0 since that indicates a desire for a fast return.
- if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) {
- pin.deleteUnderlying();
- id = 0;
- cursor = NULL; // make it an obvious error to use cursor after this point
- break;
+ if ( runner ) {
+ for (int objCount = 0; objCount < batchSize; objCount++) {
+ // The initial getNext() on a PipelineRunner may be very expensive so we don't
+ // do it when batchSize is 0 since that indicates a desire for a fast return.
+ if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) {
+ pin->deleteUnderlying();
+ cursor = NULL; // make it an obvious error to use cursor after this point
+ break;
+ }
+
+ if (resultsArray.len() + next.objsize() > byteLimit) {
+ // too big. next will be the first doc in the second batch
+ runner->pushBack(next);
+ break;
+ }
+
+ resultsArray.append(next);
}
-
- if (resultsArray.len() + next.objsize() > byteLimit) {
- // too big. next will be the first doc in the second batch
- runner->pushBack(next);
- break;
- }
-
- resultsArray.append(next);
+ }
+ else {
+ // this is to ensure that side-effects such as $out occur,
+ // and that an empty output set is the correct result of this pipeline
+ invariant( pPipeline.get() );
+ invariant( pPipeline->output() );
+ invariant( !pPipeline->output()->getNext() );
}
if (cursor) {
@@ -208,14 +247,19 @@ namespace {
}
BSONObjBuilder cursorObj(result.subobjStart("cursor"));
- cursorObj.append("id", id);
+ if ( cursor )
+ cursorObj.append("id", cursor->cursorid() );
+ else
+ cursorObj.append("id", 0LL );
cursorObj.append("ns", cursorNs);
cursorObj.append("firstBatch", resultsArray.arr());
cursorObj.done();
}
catch (...) {
// Clean up cursor on way out of scope.
- pin.deleteUnderlying();
+ if ( pin ) {
+ pin->deleteUnderlying();
+ }
throw;
}
}
@@ -278,6 +322,7 @@ namespace {
// This does the mongod-specific stuff like creating a cursor
PipelineD::prepareCursorSource(pPipeline, pCtx);
+
pPipeline->stitch();
if (pPipeline->isExplain()) {
@@ -286,16 +331,7 @@ namespace {
}
if (isCursorCommand(cmdObj)) {
- CursorId id;
- {
- // Set up cursor
- Client::ReadContext ctx(ns);
- ClientCursor* cc = new ClientCursor(new PipelineRunner(pPipeline));
- cc->isAggCursor = true; // enable special locking and ns deletion behavior
- id = cc->cursorid();
- }
-
- handleCursorCommand(id, cmdObj, result);
+ handleCursorCommand(ns, pPipeline, cmdObj, result);
}
else {
pPipeline->run(result);