summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/pipeline_command.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp159
1 files changed, 94 insertions, 65 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 5033dd05da7..de49bc61d1c 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/commands.h"
+#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
@@ -46,7 +47,7 @@
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/find_constants.h"
-#include "mongo/db/query/get_runner.h"
+#include "mongo/db/query/get_executor.h"
#include "mongo/db/storage_options.h"
namespace mongo {
@@ -54,33 +55,46 @@ namespace mongo {
namespace {
/**
- * This is a Runner implementation backed by an aggregation pipeline.
+ * Stage for pulling results out from an aggregation pipeline.
+ *
+ * XXX: move this stage to the exec/ directory.
*/
- class PipelineRunner : public Runner {
+ class PipelineProxyStage : public PlanStage {
public:
- PipelineRunner(intrusive_ptr<Pipeline> pipeline, const boost::shared_ptr<Runner>& child)
+ PipelineProxyStage(intrusive_ptr<Pipeline> pipeline,
+ const boost::shared_ptr<PlanExecutor>& child,
+ WorkingSet* ws)
: _pipeline(pipeline)
, _includeMetaData(_pipeline->getContext()->inShard) // send metadata to merger
- , _childRunner(child)
+ , _childExec(child)
+ , _ws(ws)
{}
- virtual RunnerState getNext(BSONObj* objOut, DiskLoc* dlOut) {
- if (!objOut || dlOut)
- return RUNNER_ERROR;
+ virtual StageState work(WorkingSetID* out) {
+ if (!out) {
+ return PlanStage::FAILURE;
+ }
if (!_stash.empty()) {
- *objOut = _stash.back();
+ *out = _ws->allocate();
+ WorkingSetMember* member = _ws->get(*out);
+ member->obj = _stash.back();
_stash.pop_back();
- return RUNNER_ADVANCED;
+ member->state = WorkingSetMember::OWNED_OBJ;
+ return PlanStage::ADVANCED;
}
if (boost::optional<BSONObj> next = getNextBson()) {
- *objOut = *next;
- return RUNNER_ADVANCED;
+ *out = _ws->allocate();
+ WorkingSetMember* member = _ws->get(*out);
+ member->obj = *next;
+ member->state = WorkingSetMember::OWNED_OBJ;
+ return PlanStage::ADVANCED;
}
- return RUNNER_EOF;
+ return PlanStage::IS_EOF;
}
+
virtual bool isEOF() {
if (!_stash.empty())
return false;
@@ -92,41 +106,23 @@ namespace {
return true;
}
- virtual const string& ns() {
- return _pipeline->getContext()->ns.ns();
- }
-
- virtual Status getInfo(TypeExplain** explain,
- PlanInfo** planInfo) const {
- // This should never get called in practice anyway.
- return Status(ErrorCodes::InternalError,
- "PipelineCursor doesn't implement getExplainPlan");
- }
- // propagate to child runner if still in use
+ // propagate to child executor if still in use
virtual void invalidate(const DiskLoc& dl, InvalidationType type) {
- if (boost::shared_ptr<Runner> runner = _childRunner.lock()) {
- runner->invalidate(dl, type);
- }
- }
- virtual void kill() {
- if (boost::shared_ptr<Runner> runner = _childRunner.lock()) {
- runner->kill();
+ if (boost::shared_ptr<PlanExecutor> exec = _childExec.lock()) {
+ exec->invalidate(dl, type);
}
}
- // Manage our OperationContext. We intentionally don't propagate to child Runner as that is
- // handled by DocumentSourceCursor as it needs to.
- virtual void saveState() {
+ // Manage our OperationContext. We intentionally don't propagate to the child
+ // Runner as that is handled by DocumentSourceCursor as it needs to.
+ virtual void prepareToYield() {
_pipeline->getContext()->opCtx = NULL;
}
- virtual bool restoreState(OperationContext* opCtx) {
+ virtual void recoverFromYield(OperationContext* opCtx) {
_pipeline->getContext()->opCtx = opCtx;
- return true;
}
- virtual const Collection* collection() { return NULL; }
-
/**
* Make obj the next object returned by getNext().
*/
@@ -134,6 +130,23 @@ namespace {
_stash.push_back(obj);
}
+ //
+ // These should not be used.
+ //
+
+ virtual PlanStageStats* getStats() { return NULL; }
+ virtual CommonStats* getCommonStats() { return NULL; }
+ virtual SpecificStats* getSpecificStats() { return NULL; }
+
+ // Not used.
+ virtual std::vector<PlanStage*> getChildren() const {
+ vector<PlanStage*> empty;
+ return empty;
+ }
+
+ // Not used.
+ virtual StageType stageType() const { return STAGE_PIPELINE_PROXY; }
+
private:
boost::optional<BSONObj> getNextBson() {
if (boost::optional<Document> next = _pipeline->output()->getNext()) {
@@ -152,7 +165,10 @@ namespace {
const intrusive_ptr<Pipeline> _pipeline;
vector<BSONObj> _stash;
const bool _includeMetaData;
- boost::weak_ptr<Runner> _childRunner;
+ boost::weak_ptr<PlanExecutor> _childExec;
+
+ // Not owned by us.
+ WorkingSet* _ws;
};
}
@@ -185,14 +201,14 @@ namespace {
static void handleCursorCommand(OperationContext* txn,
const string& ns,
ClientCursorPin* pin,
- PipelineRunner* runner,
+ PlanExecutor* exec,
const BSONObj& cmdObj,
BSONObjBuilder& result) {
ClientCursor* cursor = pin ? pin->c() : NULL;
if (pin) {
invariant(cursor);
- invariant(cursor->getRunner() == runner);
+ invariant(cursor->getExecutor() == exec);
invariant(cursor->isAggCursor);
}
@@ -206,32 +222,34 @@ namespace {
const int byteLimit = MaxBytesToReturnToClientAtOnce;
BSONObj next;
for (int objCount = 0; objCount < batchSize; objCount++) {
- // The initial getNext() on a PipelineRunner may be very expensive so we don't
+ // The initial getNext() on a PipelineProxyStage may be very expensive so we don't
// do it when batchSize is 0 since that indicates a desire for a fast return.
- if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) {
+ if (exec->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) {
if (pin) pin->deleteUnderlying();
- // make it an obvious error to use cursor or runner after this point
+ // make it an obvious error to use cursor or executor after this point
cursor = NULL;
- runner = NULL;
+ exec = NULL;
break;
}
if (resultsArray.len() + next.objsize() > byteLimit) {
+ // Get the pipeline proxy stage wrapped by this PlanExecutor.
+ PipelineProxyStage* proxy = static_cast<PipelineProxyStage*>(exec->getStages());
// too big. next will be the first doc in the second batch
- runner->pushBack(next);
+ proxy->pushBack(next);
break;
}
resultsArray.append(next);
}
- // NOTE: runner->isEOF() can have side effects such as writing by $out. However, it should
+ // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should
// be relatively quick since if there was no pin then the input is empty. Also, this
// violates the contract for batchSize==0. Sharding requires a cursor to be returned in that
// case. This is ok for now however, since you can't have a sharded collection that doesn't
// exist.
const bool canReturnMoreBatches = pin;
- if (!canReturnMoreBatches && runner && !runner->isEOF()) {
+ if (!canReturnMoreBatches && exec && !exec->isEOF()) {
// msgasserting since this shouldn't be possible to trigger from today's aggregation
// language. The wording assumes that the only reason pin would be null is if the
// collection doesn't exist.
@@ -308,13 +326,13 @@ namespace {
}
#endif
- PipelineRunner* runner = NULL;
- scoped_ptr<ClientCursorPin> pin; // either this OR the runnerHolder will be non-null
- auto_ptr<PipelineRunner> runnerHolder;
+ PlanExecutor* exec = NULL;
+ scoped_ptr<ClientCursorPin> pin; // either this OR the execHolder will be non-null
+ auto_ptr<PlanExecutor> execHolder;
{
// This will throw if the sharding version for this connection is out of date. The
// lock must be held continuously from now until we have we created both the output
- // ClientCursor and the input Runner. This ensures that both are using the same
+ // ClientCursor and the input executor. This ensures that both are using the same
// sharding version that we synchronize on here. This is also why we always need to
// create a ClientCursor even when we aren't outputting to a cursor. See the comment
// on ShardFilterStage for more details.
@@ -322,26 +340,37 @@ namespace {
Collection* collection = ctx.ctx().db()->getCollection(txn, ns);
- // This does mongod-specific stuff like creating the input Runner and adding to the
- // front of the pipeline if needed.
- boost::shared_ptr<Runner> input = PipelineD::prepareCursorSource(txn,
- collection,
- pPipeline,
- pCtx);
+ // This does mongod-specific stuff like creating the input PlanExecutor and adding
+ // it to the front of the pipeline if needed.
+ boost::shared_ptr<PlanExecutor> input = PipelineD::prepareCursorSource(txn,
+ collection,
+ pPipeline,
+ pCtx);
pPipeline->stitch();
- runnerHolder.reset(new PipelineRunner(pPipeline, input));
- runner = runnerHolder.get();
+ // Create the PlanExecutor which returns results from the pipeline. The WorkingSet
+ // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created
+ // PlanExecutor.
+ auto_ptr<WorkingSet> ws(new WorkingSet());
+ auto_ptr<PipelineProxyStage> proxy(
+ new PipelineProxyStage(pPipeline, input, ws.get()));
+ if (NULL == collection) {
+ execHolder.reset(new PlanExecutor(ws.release(), proxy.release(), ns));
+ }
+ else {
+ execHolder.reset(new PlanExecutor(ws.release(), proxy.release(), collection));
+ }
+ exec = execHolder.get();
if (!collection && input) {
- // If we don't have a collection, we won't be able to register any Runners, so
- // make sure that the input Runner (likely an EOFRunner) doesn't need to be
- // registered.
+ // If we don't have a collection, we won't be able to register any executors, so
+ // make sure that the input PlanExecutor (likely wrapping an EOFStage) doesn't
+ // need to be registered.
invariant(!input->collection());
}
if (collection) {
- ClientCursor* cursor = new ClientCursor(collection, runnerHolder.release());
+ ClientCursor* cursor = new ClientCursor(collection, execHolder.release());
cursor->isAggCursor = true; // enable special locking behavior
pin.reset(new ClientCursorPin(collection, cursor->cursorid()));
// Don't add any code between here and the start of the try block.
@@ -357,7 +386,7 @@ namespace {
result << "stages" << Value(pPipeline->writeExplainOps());
}
else if (isCursorCommand(cmdObj)) {
- handleCursorCommand(txn, ns, pin.get(), runner, cmdObj, result);
+ handleCursorCommand(txn, ns, pin.get(), exec, cmdObj, result);
keepCursor = true;
}
else {