summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/pipeline_proxy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec/pipeline_proxy.cpp')
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp175
1 files changed, 87 insertions, 88 deletions
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
index d7082cacdbc..9ca0fe788b5 100644
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ b/src/mongo/db/exec/pipeline_proxy.cpp
@@ -36,107 +36,106 @@
namespace mongo {
- using boost::intrusive_ptr;
- using std::shared_ptr;
- using std::vector;
-
- const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY";
-
- PipelineProxyStage::PipelineProxyStage(intrusive_ptr<Pipeline> pipeline,
- const std::shared_ptr<PlanExecutor>& child,
- WorkingSet* ws)
- : _pipeline(pipeline)
- , _includeMetaData(_pipeline->getContext()->inShard) // send metadata to merger
- , _childExec(child)
- , _ws(ws)
- {}
-
- PlanStage::StageState PipelineProxyStage::work(WorkingSetID* out) {
- if (!out) {
- return PlanStage::FAILURE;
- }
-
- if (!_stash.empty()) {
- *out = _ws->allocate();
- WorkingSetMember* member = _ws->get(*out);
- member->obj = Snapshotted<BSONObj>(SnapshotId(), _stash.back());
- _stash.pop_back();
- member->state = WorkingSetMember::OWNED_OBJ;
- return PlanStage::ADVANCED;
- }
-
- if (boost::optional<BSONObj> next = getNextBson()) {
- *out = _ws->allocate();
- WorkingSetMember* member = _ws->get(*out);
- member->obj = Snapshotted<BSONObj>(SnapshotId(), *next);
- member->state = WorkingSetMember::OWNED_OBJ;
- return PlanStage::ADVANCED;
- }
-
- return PlanStage::IS_EOF;
+using boost::intrusive_ptr;
+using std::shared_ptr;
+using std::vector;
+
+const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY";
+
+PipelineProxyStage::PipelineProxyStage(intrusive_ptr<Pipeline> pipeline,
+ const std::shared_ptr<PlanExecutor>& child,
+ WorkingSet* ws)
+ : _pipeline(pipeline),
+ _includeMetaData(_pipeline->getContext()->inShard) // send metadata to merger
+ ,
+ _childExec(child),
+ _ws(ws) {}
+
+PlanStage::StageState PipelineProxyStage::work(WorkingSetID* out) {
+ if (!out) {
+ return PlanStage::FAILURE;
}
- bool PipelineProxyStage::isEOF() {
- if (!_stash.empty())
- return false;
-
- if (boost::optional<BSONObj> next = getNextBson()) {
- _stash.push_back(*next);
- return false;
- }
-
- return true;
+ if (!_stash.empty()) {
+ *out = _ws->allocate();
+ WorkingSetMember* member = _ws->get(*out);
+ member->obj = Snapshotted<BSONObj>(SnapshotId(), _stash.back());
+ _stash.pop_back();
+ member->state = WorkingSetMember::OWNED_OBJ;
+ return PlanStage::ADVANCED;
}
- void PipelineProxyStage::invalidate(OperationContext* txn,
- const RecordId& dl,
- InvalidationType type) {
- // propagate to child executor if still in use
- if (std::shared_ptr<PlanExecutor> exec = _childExec.lock()) {
- exec->invalidate(txn, dl, type);
- }
+ if (boost::optional<BSONObj> next = getNextBson()) {
+ *out = _ws->allocate();
+ WorkingSetMember* member = _ws->get(*out);
+ member->obj = Snapshotted<BSONObj>(SnapshotId(), *next);
+ member->state = WorkingSetMember::OWNED_OBJ;
+ return PlanStage::ADVANCED;
}
- void PipelineProxyStage::saveState() {
- _pipeline->getContext()->opCtx = NULL;
- }
+ return PlanStage::IS_EOF;
+}
- void PipelineProxyStage::restoreState(OperationContext* opCtx) {
- invariant(_pipeline->getContext()->opCtx == NULL);
- _pipeline->getContext()->opCtx = opCtx;
- }
+bool PipelineProxyStage::isEOF() {
+ if (!_stash.empty())
+ return false;
- void PipelineProxyStage::pushBack(const BSONObj& obj) {
- _stash.push_back(obj);
+ if (boost::optional<BSONObj> next = getNextBson()) {
+ _stash.push_back(*next);
+ return false;
}
- vector<PlanStage*> PipelineProxyStage::getChildren() const {
- vector<PlanStage*> empty;
- return empty;
- }
+ return true;
+}
- PlanStageStats* PipelineProxyStage::getStats() {
- std::unique_ptr<PlanStageStats> ret(new PlanStageStats(CommonStats(kStageType),
- STAGE_PIPELINE_PROXY));
- ret->specific.reset(new CollectionScanStats());
- return ret.release();
+void PipelineProxyStage::invalidate(OperationContext* txn,
+ const RecordId& dl,
+ InvalidationType type) {
+ // propagate to child executor if still in use
+ if (std::shared_ptr<PlanExecutor> exec = _childExec.lock()) {
+ exec->invalidate(txn, dl, type);
}
-
- boost::optional<BSONObj> PipelineProxyStage::getNextBson() {
- if (boost::optional<Document> next = _pipeline->output()->getNext()) {
- if (_includeMetaData) {
- return next->toBsonWithMetaData();
- }
- else {
- return next->toBson();
- }
+}
+
+void PipelineProxyStage::saveState() {
+ _pipeline->getContext()->opCtx = NULL;
+}
+
+void PipelineProxyStage::restoreState(OperationContext* opCtx) {
+ invariant(_pipeline->getContext()->opCtx == NULL);
+ _pipeline->getContext()->opCtx = opCtx;
+}
+
+void PipelineProxyStage::pushBack(const BSONObj& obj) {
+ _stash.push_back(obj);
+}
+
+vector<PlanStage*> PipelineProxyStage::getChildren() const {
+ vector<PlanStage*> empty;
+ return empty;
+}
+
+PlanStageStats* PipelineProxyStage::getStats() {
+ std::unique_ptr<PlanStageStats> ret(
+ new PlanStageStats(CommonStats(kStageType), STAGE_PIPELINE_PROXY));
+ ret->specific.reset(new CollectionScanStats());
+ return ret.release();
+}
+
+boost::optional<BSONObj> PipelineProxyStage::getNextBson() {
+ if (boost::optional<Document> next = _pipeline->output()->getNext()) {
+ if (_includeMetaData) {
+ return next->toBsonWithMetaData();
+ } else {
+ return next->toBson();
}
-
- return boost::none;
}
- shared_ptr<PlanExecutor> PipelineProxyStage::getChildExecutor() {
- return _childExec.lock();
- }
+ return boost::none;
+}
+
+shared_ptr<PlanExecutor> PipelineProxyStage::getChildExecutor() {
+ return _childExec.lock();
+}
-} // namespace mongo
+} // namespace mongo