diff options
Diffstat (limited to 'src/mongo/db/exec/pipeline_proxy.cpp')
-rw-r--r-- | src/mongo/db/exec/pipeline_proxy.cpp | 175 |
1 files changed, 87 insertions, 88 deletions
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index d7082cacdbc..9ca0fe788b5 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -36,107 +36,106 @@ namespace mongo { - using boost::intrusive_ptr; - using std::shared_ptr; - using std::vector; - - const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY"; - - PipelineProxyStage::PipelineProxyStage(intrusive_ptr<Pipeline> pipeline, - const std::shared_ptr<PlanExecutor>& child, - WorkingSet* ws) - : _pipeline(pipeline) - , _includeMetaData(_pipeline->getContext()->inShard) // send metadata to merger - , _childExec(child) - , _ws(ws) - {} - - PlanStage::StageState PipelineProxyStage::work(WorkingSetID* out) { - if (!out) { - return PlanStage::FAILURE; - } - - if (!_stash.empty()) { - *out = _ws->allocate(); - WorkingSetMember* member = _ws->get(*out); - member->obj = Snapshotted<BSONObj>(SnapshotId(), _stash.back()); - _stash.pop_back(); - member->state = WorkingSetMember::OWNED_OBJ; - return PlanStage::ADVANCED; - } - - if (boost::optional<BSONObj> next = getNextBson()) { - *out = _ws->allocate(); - WorkingSetMember* member = _ws->get(*out); - member->obj = Snapshotted<BSONObj>(SnapshotId(), *next); - member->state = WorkingSetMember::OWNED_OBJ; - return PlanStage::ADVANCED; - } - - return PlanStage::IS_EOF; +using boost::intrusive_ptr; +using std::shared_ptr; +using std::vector; + +const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY"; + +PipelineProxyStage::PipelineProxyStage(intrusive_ptr<Pipeline> pipeline, + const std::shared_ptr<PlanExecutor>& child, + WorkingSet* ws) + : _pipeline(pipeline), + _includeMetaData(_pipeline->getContext()->inShard) // send metadata to merger + , + _childExec(child), + _ws(ws) {} + +PlanStage::StageState PipelineProxyStage::work(WorkingSetID* out) { + if (!out) { + return PlanStage::FAILURE; } - bool PipelineProxyStage::isEOF() { - if (!_stash.empty()) - return false; - - if (boost::optional<BSONObj> next = getNextBson()) { - _stash.push_back(*next); - return false; - } - - return true; + if (!_stash.empty()) { + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->obj = Snapshotted<BSONObj>(SnapshotId(), _stash.back()); + _stash.pop_back(); + member->state = WorkingSetMember::OWNED_OBJ; + return PlanStage::ADVANCED; } - void PipelineProxyStage::invalidate(OperationContext* txn, - const RecordId& dl, - InvalidationType type) { - // propagate to child executor if still in use - if (std::shared_ptr<PlanExecutor> exec = _childExec.lock()) { - exec->invalidate(txn, dl, type); - } + if (boost::optional<BSONObj> next = getNextBson()) { + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->obj = Snapshotted<BSONObj>(SnapshotId(), *next); + member->state = WorkingSetMember::OWNED_OBJ; + return PlanStage::ADVANCED; } - void PipelineProxyStage::saveState() { - _pipeline->getContext()->opCtx = NULL; - } + return PlanStage::IS_EOF; +} - void PipelineProxyStage::restoreState(OperationContext* opCtx) { - invariant(_pipeline->getContext()->opCtx == NULL); - _pipeline->getContext()->opCtx = opCtx; - } +bool PipelineProxyStage::isEOF() { + if (!_stash.empty()) + return false; - void PipelineProxyStage::pushBack(const BSONObj& obj) { - _stash.push_back(obj); + if (boost::optional<BSONObj> next = getNextBson()) { + _stash.push_back(*next); + return false; } - vector<PlanStage*> PipelineProxyStage::getChildren() const { - vector<PlanStage*> empty; - return empty; - } + return true; +} - PlanStageStats* PipelineProxyStage::getStats() { - std::unique_ptr<PlanStageStats> ret(new PlanStageStats(CommonStats(kStageType), - STAGE_PIPELINE_PROXY)); - ret->specific.reset(new CollectionScanStats()); - return ret.release(); +void PipelineProxyStage::invalidate(OperationContext* txn, + const RecordId& dl, + InvalidationType type) { + // propagate to child executor if still in use + if (std::shared_ptr<PlanExecutor> exec = _childExec.lock()) { + exec->invalidate(txn, dl, type); } - - boost::optional<BSONObj> PipelineProxyStage::getNextBson() { - if (boost::optional<Document> next = _pipeline->output()->getNext()) { - if (_includeMetaData) { - return next->toBsonWithMetaData(); - } - else { - return next->toBson(); - } +} + +void PipelineProxyStage::saveState() { + _pipeline->getContext()->opCtx = NULL; +} + +void PipelineProxyStage::restoreState(OperationContext* opCtx) { + invariant(_pipeline->getContext()->opCtx == NULL); + _pipeline->getContext()->opCtx = opCtx; +} + +void PipelineProxyStage::pushBack(const BSONObj& obj) { + _stash.push_back(obj); +} + +vector<PlanStage*> PipelineProxyStage::getChildren() const { + vector<PlanStage*> empty; + return empty; +} + +PlanStageStats* PipelineProxyStage::getStats() { + std::unique_ptr<PlanStageStats> ret( + new PlanStageStats(CommonStats(kStageType), STAGE_PIPELINE_PROXY)); + ret->specific.reset(new CollectionScanStats()); + return ret.release(); +} + +boost::optional<BSONObj> PipelineProxyStage::getNextBson() { + if (boost::optional<Document> next = _pipeline->output()->getNext()) { + if (_includeMetaData) { + return next->toBsonWithMetaData(); + } else { + return next->toBson(); } - - return boost::none; } - shared_ptr<PlanExecutor> PipelineProxyStage::getChildExecutor() { - return _childExec.lock(); - } + return boost::none; +} + +shared_ptr<PlanExecutor> PipelineProxyStage::getChildExecutor() { + return _childExec.lock(); +} -} // namespace mongo +} // namespace mongo |