diff options
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 159 |
1 files changed, 94 insertions, 65 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 5033dd05da7..de49bc61d1c 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -38,6 +38,7 @@ #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/commands.h" +#include "mongo/db/exec/plan_stage.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" @@ -46,7 +47,7 @@ #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/find_constants.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/storage_options.h" namespace mongo { @@ -54,33 +55,46 @@ namespace mongo { namespace { /** - * This is a Runner implementation backed by an aggregation pipeline. + * Stage for pulling results out from an aggregation pipeline. + * + * XXX: move this stage to the exec/ directory. */ - class PipelineRunner : public Runner { + class PipelineProxyStage : public PlanStage { public: - PipelineRunner(intrusive_ptr<Pipeline> pipeline, const boost::shared_ptr<Runner>& child) + PipelineProxyStage(intrusive_ptr<Pipeline> pipeline, + const boost::shared_ptr<PlanExecutor>& child, + WorkingSet* ws) : _pipeline(pipeline) , _includeMetaData(_pipeline->getContext()->inShard) // send metadata to merger - , _childRunner(child) + , _childExec(child) + , _ws(ws) {} - virtual RunnerState getNext(BSONObj* objOut, DiskLoc* dlOut) { - if (!objOut || dlOut) - return RUNNER_ERROR; + virtual StageState work(WorkingSetID* out) { + if (!out) { + return PlanStage::FAILURE; + } if (!_stash.empty()) { - *objOut = _stash.back(); + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->obj = _stash.back(); _stash.pop_back(); - return RUNNER_ADVANCED; + member->state = WorkingSetMember::OWNED_OBJ; + return PlanStage::ADVANCED; } if (boost::optional<BSONObj> next = getNextBson()) { - *objOut = *next; - return RUNNER_ADVANCED; + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->obj = *next; + member->state = WorkingSetMember::OWNED_OBJ; + return PlanStage::ADVANCED; } - return RUNNER_EOF; + return PlanStage::IS_EOF; } + virtual bool isEOF() { if (!_stash.empty()) return false; @@ -92,41 +106,23 @@ namespace { return true; } - virtual const string& ns() { - return _pipeline->getContext()->ns.ns(); - } - - virtual Status getInfo(TypeExplain** explain, - PlanInfo** planInfo) const { - // This should never get called in practice anyway. - return Status(ErrorCodes::InternalError, - "PipelineCursor doesn't implement getExplainPlan"); - } - // propagate to child runner if still in use + // propagate to child executor if still in use virtual void invalidate(const DiskLoc& dl, InvalidationType type) { - if (boost::shared_ptr<Runner> runner = _childRunner.lock()) { - runner->invalidate(dl, type); - } - } - virtual void kill() { - if (boost::shared_ptr<Runner> runner = _childRunner.lock()) { - runner->kill(); + if (boost::shared_ptr<PlanExecutor> exec = _childExec.lock()) { + exec->invalidate(dl, type); } } - // Manage our OperationContext. We intentionally don't propagate to child Runner as that is - // handled by DocumentSourceCursor as it needs to. - virtual void saveState() { + // Manage our OperationContext. We intentionally don't propagate to the child + // Runner as that is handled by DocumentSourceCursor as it needs to. + virtual void prepareToYield() { _pipeline->getContext()->opCtx = NULL; } - virtual bool restoreState(OperationContext* opCtx) { + virtual void recoverFromYield(OperationContext* opCtx) { _pipeline->getContext()->opCtx = opCtx; - return true; } - virtual const Collection* collection() { return NULL; } - /** * Make obj the next object returned by getNext(). */ @@ -134,6 +130,23 @@ namespace { _stash.push_back(obj); } + // + // These should not be used. + // + + virtual PlanStageStats* getStats() { return NULL; } + virtual CommonStats* getCommonStats() { return NULL; } + virtual SpecificStats* getSpecificStats() { return NULL; } + + // Not used. + virtual std::vector<PlanStage*> getChildren() const { + vector<PlanStage*> empty; + return empty; + } + + // Not used. + virtual StageType stageType() const { return STAGE_PIPELINE_PROXY; } + private: boost::optional<BSONObj> getNextBson() { if (boost::optional<Document> next = _pipeline->output()->getNext()) { @@ -152,7 +165,10 @@ namespace { const intrusive_ptr<Pipeline> _pipeline; vector<BSONObj> _stash; const bool _includeMetaData; - boost::weak_ptr<Runner> _childRunner; + boost::weak_ptr<PlanExecutor> _childExec; + + // Not owned by us. + WorkingSet* _ws; }; } @@ -185,14 +201,14 @@ namespace { static void handleCursorCommand(OperationContext* txn, const string& ns, ClientCursorPin* pin, - PipelineRunner* runner, + PlanExecutor* exec, const BSONObj& cmdObj, BSONObjBuilder& result) { ClientCursor* cursor = pin ? pin->c() : NULL; if (pin) { invariant(cursor); - invariant(cursor->getRunner() == runner); + invariant(cursor->getExecutor() == exec); invariant(cursor->isAggCursor); } @@ -206,32 +222,34 @@ namespace { const int byteLimit = MaxBytesToReturnToClientAtOnce; BSONObj next; for (int objCount = 0; objCount < batchSize; objCount++) { - // The initial getNext() on a PipelineRunner may be very expensive so we don't + // The initial getNext() on a PipelineProxyStage may be very expensive so we don't // do it when batchSize is 0 since that indicates a desire for a fast return. - if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { + if (exec->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { if (pin) pin->deleteUnderlying(); - // make it an obvious error to use cursor or runner after this point + // make it an obvious error to use cursor or executor after this point cursor = NULL; - runner = NULL; + exec = NULL; break; } if (resultsArray.len() + next.objsize() > byteLimit) { + // Get the pipeline proxy stage wrapped by this PlanExecutor. + PipelineProxyStage* proxy = static_cast<PipelineProxyStage*>(exec->getStages()); // too big. next will be the first doc in the second batch - runner->pushBack(next); + proxy->pushBack(next); break; } resultsArray.append(next); } - // NOTE: runner->isEOF() can have side effects such as writing by $out. However, it should + // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should // be relatively quick since if there was no pin then the input is empty. Also, this // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that // case. This is ok for now however, since you can't have a sharded collection that doesn't // exist. const bool canReturnMoreBatches = pin; - if (!canReturnMoreBatches && runner && !runner->isEOF()) { + if (!canReturnMoreBatches && exec && !exec->isEOF()) { // msgasserting since this shouldn't be possible to trigger from today's aggregation // language. The wording assumes that the only reason pin would be null is if the // collection doesn't exist. @@ -308,13 +326,13 @@ namespace { } #endif - PipelineRunner* runner = NULL; - scoped_ptr<ClientCursorPin> pin; // either this OR the runnerHolder will be non-null - auto_ptr<PipelineRunner> runnerHolder; + PlanExecutor* exec = NULL; + scoped_ptr<ClientCursorPin> pin; // either this OR the execHolder will be non-null + auto_ptr<PlanExecutor> execHolder; { // This will throw if the sharding version for this connection is out of date. The // lock must be held continuously from now until we have we created both the output - // ClientCursor and the input Runner. This ensures that both are using the same + // ClientCursor and the input executor. This ensures that both are using the same // sharding version that we synchronize on here. This is also why we always need to // create a ClientCursor even when we aren't outputting to a cursor. See the comment // on ShardFilterStage for more details. @@ -322,26 +340,37 @@ namespace { Collection* collection = ctx.ctx().db()->getCollection(txn, ns); - // This does mongod-specific stuff like creating the input Runner and adding to the - // front of the pipeline if needed. - boost::shared_ptr<Runner> input = PipelineD::prepareCursorSource(txn, - collection, - pPipeline, - pCtx); + // This does mongod-specific stuff like creating the input PlanExecutor and adding + // it to the front of the pipeline if needed. + boost::shared_ptr<PlanExecutor> input = PipelineD::prepareCursorSource(txn, + collection, + pPipeline, + pCtx); pPipeline->stitch(); - runnerHolder.reset(new PipelineRunner(pPipeline, input)); - runner = runnerHolder.get(); + // Create the PlanExecutor which returns results from the pipeline. The WorkingSet + // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created + // PlanExecutor. + auto_ptr<WorkingSet> ws(new WorkingSet()); + auto_ptr<PipelineProxyStage> proxy( + new PipelineProxyStage(pPipeline, input, ws.get())); + if (NULL == collection) { + execHolder.reset(new PlanExecutor(ws.release(), proxy.release(), ns)); + } + else { + execHolder.reset(new PlanExecutor(ws.release(), proxy.release(), collection)); + } + exec = execHolder.get(); if (!collection && input) { - // If we don't have a collection, we won't be able to register any Runners, so - // make sure that the input Runner (likely an EOFRunner) doesn't need to be - // registered. + // If we don't have a collection, we won't be able to register any executors, so + // make sure that the input PlanExecutor (likely wrapping an EOFStage) doesn't + // need to be registered. invariant(!input->collection()); } if (collection) { - ClientCursor* cursor = new ClientCursor(collection, runnerHolder.release()); + ClientCursor* cursor = new ClientCursor(collection, execHolder.release()); cursor->isAggCursor = true; // enable special locking behavior pin.reset(new ClientCursorPin(collection, cursor->cursorid())); // Don't add any code between here and the start of the try block. @@ -357,7 +386,7 @@ namespace { result << "stages" << Value(pPipeline->writeExplainOps()); } else if (isCursorCommand(cmdObj)) { - handleCursorCommand(txn, ns, pin.get(), runner, cmdObj, result); + handleCursorCommand(txn, ns, pin.get(), exec, cmdObj, result); keepCursor = true; } else { |