summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
authorHari Khalsa <hkhalsa@10gen.com>2013-07-08 14:44:32 -0400
committerHari Khalsa <hkhalsa@10gen.com>2013-07-09 14:10:54 -0400
commitb8f0ec598013009c56dee527e76429ffa7b8c394 (patch)
treea7cf31d7fbf6bc525f4b1ded421dad8a97aed9a5 /src/mongo/db/exec
parentfedd312c424fc5f82f7be1dad13f3dd74403c4a4 (diff)
downloadmongo-b8f0ec598013009c56dee527e76429ffa7b8c394.tar.gz
SERVER-10026 fetch limit skip or
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/SConscript51
-rw-r--r--src/mongo/db/exec/fetch.cpp171
-rw-r--r--src/mongo/db/exec/fetch.h69
-rw-r--r--src/mongo/db/exec/limit.cpp52
-rw-r--r--src/mongo/db/exec/limit.h52
-rw-r--r--src/mongo/db/exec/mock_stage.cpp52
-rw-r--r--src/mongo/db/exec/mock_stage.h73
-rw-r--r--src/mongo/db/exec/or.cpp111
-rw-r--r--src/mongo/db/exec/or.h68
-rw-r--r--src/mongo/db/exec/plan_stage.h29
-rw-r--r--src/mongo/db/exec/simple_plan_runner.h27
-rw-r--r--src/mongo/db/exec/skip.cpp58
-rw-r--r--src/mongo/db/exec/skip.h51
-rw-r--r--src/mongo/db/exec/stagedebug_cmd.cpp58
14 files changed, 910 insertions, 12 deletions
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
new file mode 100644
index 00000000000..c0606c4ab1d
--- /dev/null
+++ b/src/mongo/db/exec/SConscript
@@ -0,0 +1,51 @@
+# -*- mode: python -*-
+
+Import("env")
+
+env.StaticLibrary(
+ target = "working_set",
+ source = [
+ "working_set.cpp",
+ ],
+ LIBDEPS = [
+ "$BUILD_DIR/mongo/bson",
+ ],
+)
+
+env.CppUnitTest(
+ target = "working_set_test",
+ source = [
+ "working_set_test.cpp"
+ ],
+ LIBDEPS = [
+ "working_set",
+ ],
+)
+
+env.StaticLibrary(
+ target = "mock_stage",
+ source = [
+ "mock_stage.cpp",
+ ],
+ LIBDEPS = [
+ "working_set",
+ ],
+)
+
+env.StaticLibrary(
+ target = 'exec',
+ source = [
+ "and_hash.cpp",
+ "and_sorted.cpp",
+ "fetch.cpp",
+ "index_scan.cpp",
+ "limit.cpp",
+ "or.cpp",
+ "skip.cpp",
+ "stagedebug_cmd.cpp",
+ "working_set_common.cpp",
+ ],
+ LIBDEPS = [
+ "$BUILD_DIR/mongo/bson"
+ ],
+)
diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp
new file mode 100644
index 00000000000..d6f81c04693
--- /dev/null
+++ b/src/mongo/db/exec/fetch.cpp
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "mongo/db/exec/fetch.h"
+
+#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/pdfile.h"
+#include "mongo/util/fail_point_service.h"
+
+namespace mongo {
+
+ // Some fail points for testing.
+ MONGO_FP_DECLARE(fetchInMemoryFail);
+ MONGO_FP_DECLARE(fetchInMemorySucceed);
+
+ FetchStage::FetchStage(WorkingSet* ws, PlanStage* child, Matcher* matcher)
+ : _ws(ws), _child(child), _matcher(matcher), _idBeingPagedIn(WorkingSet::INVALID_ID) { }
+
+ FetchStage::~FetchStage() { }
+
+ bool FetchStage::isEOF() {
+ if (WorkingSet::INVALID_ID != _idBeingPagedIn) {
+ // We asked our parent for a page-in but he didn't get back to us. We still need to
+ // return the result that _idBeingPagedIn refers to.
+ return false;
+ }
+
+ return _child->isEOF();
+ }
+
+ bool recordInMemory(const char* data) {
+ if (MONGO_FAIL_POINT(fetchInMemoryFail)) {
+ return false;
+ }
+
+ if (MONGO_FAIL_POINT(fetchInMemorySucceed)) {
+ return true;
+ }
+
+ return Record::likelyInPhysicalMemory(data);
+ }
+
+ PlanStage::StageState FetchStage::work(WorkingSetID* out) {
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ // If we asked our parent for a page-in last time work(...) was called, finish the fetch.
+ if (WorkingSet::INVALID_ID != _idBeingPagedIn) {
+ return fetchCompleted(out);
+ }
+
+ // If we're here, we're not waiting for a DiskLoc to be fetched. Get another to-be-fetched
+ // result from our child.
+ WorkingSetID id;
+ StageState status = _child->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ WorkingSetMember* member = _ws->get(id);
+
+ // If there's an obj there, there is no fetching to perform.
+ if (member->hasObj()) {
+ return returnIfMatches(member, id, out);
+ }
+
+ // We need a valid loc to fetch from and this is the only state that has one.
+ verify(WorkingSetMember::LOC_AND_IDX == member->state);
+ verify(member->hasLoc());
+
+ Record* record = member->loc.rec();
+ const char* data = record->dataNoThrowing();
+
+ if (!recordInMemory(data)) {
+ // member->loc points to a record that's NOT in memory. Pass a fetch request up.
+ verify(WorkingSet::INVALID_ID == _idBeingPagedIn);
+ _idBeingPagedIn = id;
+ *out = id;
+ return PlanStage::NEED_FETCH;
+ }
+ else {
+ // Don't need index data anymore as we have an obj.
+ member->keyData.clear();
+ member->obj = BSONObj(data);
+ member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
+ return returnIfMatches(member, id, out);
+ }
+ }
+ else {
+ // NEED_TIME/YIELD, ERROR, IS_EOF
+ return status;
+ }
+ }
+
+ void FetchStage::prepareToYield() { _child->prepareToYield(); }
+
+ void FetchStage::recoverFromYield() { _child->recoverFromYield(); }
+
+ void FetchStage::invalidate(const DiskLoc& dl) {
+ _child->invalidate(dl);
+
+ // If we're holding on to an object that we're waiting for the runner to page in...
+ if (WorkingSet::INVALID_ID != _idBeingPagedIn) {
+ WorkingSetMember* member = _ws->get(_idBeingPagedIn);
+ verify(member->hasLoc());
+ // The DiskLoc is about to perish so we force a fetch of the data.
+ if (member->loc == dl) {
+ // This is a fetch inside of a write lock (that somebody else holds) but the other
+ // holder is likely operating on this object so this shouldn't have to hit disk.
+ WorkingSetCommon::fetchAndInvalidateLoc(member);
+ }
+ }
+ }
+
+ PlanStage::StageState FetchStage::fetchCompleted(WorkingSetID* out) {
+ WorkingSetMember* member = _ws->get(_idBeingPagedIn);
+
+ // The DiskLoc we're waiting to page in was invalidated (forced fetch). Test for
+ // matching and maybe pass it up.
+ if (member->state == WorkingSetMember::OWNED_OBJ) {
+ WorkingSetID memberID = _idBeingPagedIn;
+ _idBeingPagedIn = WorkingSet::INVALID_ID;
+ return returnIfMatches(member, memberID, out);
+ }
+
+ // Assume that the caller has fetched appropriately.
+ // TODO: Do we want to double-check the runner? Not sure how reliable likelyInMemory is
+ // on all platforms.
+ verify(member->hasLoc());
+ verify(!member->hasObj());
+
+ // Make the (unowned) object.
+ Record* record = member->loc.rec();
+ const char* data = record->dataNoThrowing();
+ member->obj = BSONObj(data);
+
+ // Don't need index data anymore as we have an obj.
+ member->keyData.clear();
+ member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
+ verify(!member->obj.isOwned());
+
+ // Return the obj if it passes our filter.
+ WorkingSetID memberID = _idBeingPagedIn;
+ _idBeingPagedIn = WorkingSet::INVALID_ID;
+ return returnIfMatches(member, memberID, out);
+ }
+
+ PlanStage::StageState FetchStage::returnIfMatches(WorkingSetMember* member,
+ WorkingSetID memberID,
+ WorkingSetID* out) {
+ if (NULL == _matcher || _matcher->matches(member)) {
+ *out = memberID;
+ return PlanStage::ADVANCED;
+ }
+ else {
+ _ws->free(memberID);
+ return PlanStage::NEED_TIME;
+ }
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/fetch.h b/src/mongo/db/exec/fetch.h
new file mode 100644
index 00000000000..67b6c026d43
--- /dev/null
+++ b/src/mongo/db/exec/fetch.h
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "mongo/db/diskloc.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/matcher.h"
+#include "mongo/db/exec/plan_stage.h"
+
+namespace mongo {
+
+ /**
+ * This stage turns a DiskLoc into a BSONObj.
+ *
+ * In WorkingSetMember terms, it transitions from LOC_AND_IDX to LOC_AND_UNOWNED_OBJ by reading
+ * the record at the provided loc. Returns verbatim any data that already has an object.
+ *
+ * Preconditions: Valid DiskLoc.
+ */
+ class FetchStage : public PlanStage {
+ public:
+ FetchStage(WorkingSet* ws, PlanStage* child, Matcher* matcher);
+ virtual ~FetchStage();
+
+ virtual bool isEOF();
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void prepareToYield();
+ virtual void recoverFromYield();
+ virtual void invalidate(const DiskLoc& dl);
+
+ private:
+ /**
+ * If the member (with id memberID) passes our filter, set *out to memberID and return that
+ * ADVANCED. Otherwise, free memberID and return NEED_TIME.
+ */
+ StageState returnIfMatches(WorkingSetMember* member, WorkingSetID memberID,
+ WorkingSetID* out);
+
+ /**
+ * work(...) delegates to this when we're called after requesting a fetch.
+ */
+ StageState fetchCompleted(WorkingSetID* out);
+
+ // _ws is not owned by us.
+ WorkingSet* _ws;
+ scoped_ptr<PlanStage> _child;
+ scoped_ptr<Matcher> _matcher;
+
+ // If we're fetching a DiskLoc and it points at something that's not in memory, we return a
+ // a "please page this in" result and hold on to the WSID until the next call to work(...).
+ WorkingSetID _idBeingPagedIn;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/limit.cpp b/src/mongo/db/exec/limit.cpp
new file mode 100644
index 00000000000..9f394e6f508
--- /dev/null
+++ b/src/mongo/db/exec/limit.cpp
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "mongo/db/exec/limit.h"
+
+namespace mongo {
+
+ LimitStage::LimitStage(int limit, WorkingSet* ws, PlanStage* child)
+ : _ws(ws), _child(child), _numToReturn(limit) { }
+
+ LimitStage::~LimitStage() { }
+
+ bool LimitStage::isEOF() { return (0 == _numToReturn) || _child->isEOF(); }
+
+ PlanStage::StageState LimitStage::work(WorkingSetID* out) {
+ // If we've returned as many results as we're limited to, isEOF will be true.
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ WorkingSetID id;
+ StageState status = _child->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ *out = id;
+ --_numToReturn;
+ return PlanStage::ADVANCED;
+ }
+ else {
+ // NEED_TIME/YIELD, ERROR, IS_EOF
+ return status;
+ }
+ }
+
+ void LimitStage::prepareToYield() { _child->prepareToYield(); }
+
+ void LimitStage::recoverFromYield() { _child->recoverFromYield(); }
+
+ void LimitStage::invalidate(const DiskLoc& dl) { _child->invalidate(dl); }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/limit.h b/src/mongo/db/exec/limit.h
new file mode 100644
index 00000000000..eb674d8deed
--- /dev/null
+++ b/src/mongo/db/exec/limit.h
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "mongo/db/diskloc.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/exec/plan_stage.h"
+
+namespace mongo {
+
+ /**
+ * This stage implements limit functionality. It only returns 'limit' results before EOF.
+ *
+ * Sort has a baked-in limit, as it can optimize the sort if it has a limit.
+ *
+ * Preconditions: None.
+ */
+ class LimitStage : public PlanStage {
+ public:
+ LimitStage(int limit, WorkingSet* ws, PlanStage* child);
+ virtual ~LimitStage();
+
+ virtual bool isEOF();
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void prepareToYield();
+ virtual void recoverFromYield();
+ virtual void invalidate(const DiskLoc& dl);
+
+ private:
+ WorkingSet* _ws;
+ scoped_ptr<PlanStage> _child;
+
+ // We only return this many results.
+ int _numToReturn;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/mock_stage.cpp b/src/mongo/db/exec/mock_stage.cpp
new file mode 100644
index 00000000000..128c93e1847
--- /dev/null
+++ b/src/mongo/db/exec/mock_stage.cpp
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "mongo/db/exec/mock_stage.h"
+
+namespace mongo {
+
+ MockStage::MockStage(WorkingSet* ws) : _ws(ws) { }
+
+ PlanStage::StageState MockStage::work(WorkingSetID* out) {
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ StageState state = _results.front();
+ _results.pop();
+
+ if (PlanStage::ADVANCED == state) {
+ // We advanced. Put the mock obj into the working set.
+ WorkingSetID id = _ws->allocate();
+ WorkingSetMember* member = _ws->get(id);
+ *member = _members.front();
+ _members.pop();
+ *out = id;
+ }
+
+ return state;
+ }
+
+ bool MockStage::isEOF() { return _results.empty(); }
+
+ void MockStage::pushBack(const PlanStage::StageState state) {
+ _results.push(state);
+ }
+
+ void MockStage::pushBack(const WorkingSetMember& member) {
+ _results.push(PlanStage::ADVANCED);
+ _members.push(member);
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/mock_stage.h b/src/mongo/db/exec/mock_stage.h
new file mode 100644
index 00000000000..49b44fccf82
--- /dev/null
+++ b/src/mongo/db/exec/mock_stage.h
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <queue>
+
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/exec/working_set.h"
+
+namespace mongo {
+
+ class DiskLoc;
+
+ /**
+ * MockStage is a data-producing stage that is used for testing. Unlike the other two leaf
+ * stages (CollectionScan and IndexScan) MockStage does not require any underlying storage
+ * layer.
+ *
+ * A MockStage is "programmed" by pushing return values from work() onto its internal queue.
+ * Calls to MockStage::work() pop values off that queue and return them in FIFO order,
+ * annotating the working set with data when appropriate.
+ */
+ class MockStage : public PlanStage {
+ public:
+ MockStage(WorkingSet* ws);
+ virtual ~MockStage() { }
+
+ virtual StageState work(WorkingSetID* out);
+
+ virtual bool isEOF();
+
+ // These don't really mean anything here.
+ // Some day we could count the # of calls to the yield functions to check that other stages
+ // have correct yielding behavior.
+ virtual void prepareToYield() { }
+ virtual void recoverFromYield() { }
+ virtual void invalidate(const DiskLoc& dl) { }
+
+ /**
+ * Add a result to the back of the queue. work() goes through the queue.
+ * Either no data is returned (just a state), or...
+ */
+ void pushBack(const PlanStage::StageState state);
+
+ /**
+ * ...data is returned (and we ADVANCED)
+ */
+ void pushBack(const WorkingSetMember& member);
+
+ private:
+ // We don't own this.
+ WorkingSet* _ws;
+
+ // The data we return.
+ queue<PlanStage::StageState> _results;
+ queue<WorkingSetMember> _members;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/or.cpp b/src/mongo/db/exec/or.cpp
new file mode 100644
index 00000000000..a0a678e2b95
--- /dev/null
+++ b/src/mongo/db/exec/or.cpp
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "mongo/db/exec/or.h"
+
+namespace mongo {
+
+ OrStage::OrStage(WorkingSet* ws, bool dedup, Matcher* matcher)
+ : _ws(ws), _matcher(matcher), _currentChild(0), _dedup(dedup) { }
+
+ OrStage::~OrStage() {
+ for (size_t i = 0; i < _children.size(); ++i) {
+ delete _children[i];
+ }
+ }
+
+ void OrStage::addChild(PlanStage* child) { _children.push_back(child); }
+
+ bool OrStage::isEOF() { return _currentChild >= _children.size(); }
+
+ PlanStage::StageState OrStage::work(WorkingSetID* out) {
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ WorkingSetID id;
+ StageState childStatus = _children[_currentChild]->work(&id);
+
+ if (PlanStage::ADVANCED == childStatus) {
+ WorkingSetMember* member = _ws->get(id);
+ verify(member->hasLoc());
+
+ // If we're deduping...
+ if (_dedup) {
+ // ...and we've seen the DiskLoc before
+ if (_seen.end() != _seen.find(member->loc)) {
+ // ...drop it.
+ _ws->free(id);
+ return PlanStage::NEED_TIME;
+ }
+ else {
+ // Otherwise, note that we've seen it.
+ _seen.insert(member->loc);
+ }
+ }
+
+ if (NULL == _matcher || _matcher->matches(member)) {
+ // Match! return it.
+ *out = id;
+ return PlanStage::ADVANCED;
+ }
+ else {
+ // Does not match, try again.
+ _ws->free(id);
+ return PlanStage::NEED_TIME;
+ }
+ }
+ else if (PlanStage::IS_EOF == childStatus) {
+ // Done with _currentChild, move to the next one.
+ ++_currentChild;
+
+ // Maybe we're out of children.
+ if (isEOF()) {
+ return PlanStage::IS_EOF;
+ }
+ else {
+ return PlanStage::NEED_TIME;
+ }
+ }
+ else {
+ // NEED_TIME, ERROR, NEED_YIELD, pass them up.
+ return childStatus;
+ }
+ }
+
+ void OrStage::prepareToYield() {
+ for (size_t i = 0; i < _children.size(); ++i) {
+ _children[i]->prepareToYield();
+ }
+ }
+
+ void OrStage::recoverFromYield() {
+ for (size_t i = 0; i < _children.size(); ++i) {
+ _children[i]->recoverFromYield();
+ }
+ }
+
+ void OrStage::invalidate(const DiskLoc& dl) {
+ if (isEOF()) { return; }
+
+ for (size_t i = 0; i < _children.size(); ++i) {
+ _children[i]->invalidate(dl);
+ }
+
+ // If we see DL again it is not the same record as it once was so we still want to
+ // return it.
+ if (_dedup) { _seen.erase(dl); }
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/or.h b/src/mongo/db/exec/or.h
new file mode 100644
index 00000000000..8df29a0c00b
--- /dev/null
+++ b/src/mongo/db/exec/or.h
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "mongo/db/diskloc.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/matcher.h"
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/platform/unordered_set.h"
+
+namespace mongo {
+
+ /**
+ * This stage outputs the union of its children. It optionally deduplicates on DiskLoc.
+ *
+ * Preconditions: Valid DiskLoc.
+ *
+ * If we're deduping, we may fail to dedup any invalidated DiskLoc properly.
+ */
+ class OrStage : public PlanStage {
+ public:
+ OrStage(WorkingSet* ws, bool dedup, Matcher* matcher);
+ virtual ~OrStage();
+
+ void addChild(PlanStage* child);
+
+ virtual bool isEOF();
+
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void prepareToYield();
+ virtual void recoverFromYield();
+ virtual void invalidate(const DiskLoc& dl);
+
+ private:
+ // Not owned by us.
+ WorkingSet* _ws;
+
+ scoped_ptr<Matcher> _matcher;
+
+ // Owned by us.
+ vector<PlanStage*> _children;
+
+ // Which of _children are we calling work(...) on now?
+ size_t _currentChild;
+
+ // True if we dedup on DiskLoc, false otherwise.
+ bool _dedup;
+
+ // Which DiskLocs have we returned?
+ unordered_set<DiskLoc, DiskLoc::Hasher> _seen;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/plan_stage.h b/src/mongo/db/exec/plan_stage.h
index c1a232b3b2c..0532ef1cecf 100644
--- a/src/mongo/db/exec/plan_stage.h
+++ b/src/mongo/db/exec/plan_stage.h
@@ -94,16 +94,33 @@ namespace mongo {
* All possible return values of work(...)
*/
enum StageState {
- // work(...) has returned a new result in its out parameter.
+ // work(...) has returned a new result in its out parameter. The caller must free it
+ // from the working set when done with it.
ADVANCED,
- // work(...) won't do anything more. isEOF() will also be true.
+
+ // work(...) won't do anything more. isEOF() will also be true. There is nothing
+ // output in the out parameter.
IS_EOF,
- // work(...) needs more time to product a result. Call work(...) again.
+
+ // work(...) needs more time to product a result. Call work(...) again. There is
+ // nothing output in the out parameter.
NEED_TIME,
- // Something has gone unrecoverably wrong. Stop running this query.
+
+ // Something has gone unrecoverably wrong. Stop running this query. There is nothing
+ // output in the out parameter.
FAILURE,
- // Something isn't in memory. Fetch it. TODO: actually support this (forthcoming).
- NEED_YIELD,
+
+ // Something isn't in memory. Fetch it.
+ //
+ // Full fetch semantics:
+ // The fetch-requesting stage populates the out parameter of work(...) with a WSID that
+ // refers to a WSM with a valid loc. Each stage that receives a NEED_FETCH from a child
+ // must propagate the NEED_FETCH up and perform no work. The plan runner is responsible
+ // for paging in the data upon receipt of a NEED_FETCH. The plan runner does NOT free
+ // the WSID of the requested fetch. The stage that requested the fetch holds the WSID
+ // of the loc it wants fetched. On the next call to work() that stage can assume a
+ // fetch was performed on the WSM that the held WSID refers to.
+ NEED_FETCH,
};
/**
diff --git a/src/mongo/db/exec/simple_plan_runner.h b/src/mongo/db/exec/simple_plan_runner.h
index 34ca0b2f29a..797c3c9c9ec 100644
--- a/src/mongo/db/exec/simple_plan_runner.h
+++ b/src/mongo/db/exec/simple_plan_runner.h
@@ -17,6 +17,7 @@
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/pdfile.h"
namespace mongo {
@@ -26,7 +27,6 @@ namespace mongo {
*
* TODO: Yielding policy
* TODO: Graceful error handling
- * TODO: Fetch
* TODO: Stats, diagnostics, instrumentation, etc.
*/
class SimplePlanRunner {
@@ -59,8 +59,31 @@ namespace mongo {
else if (code == PlanStage::NEED_TIME) {
// TODO: Occasionally yield. For now, we run until we get another result.
}
+ else if (PlanStage::NEED_FETCH == code) {
+ // id has a loc and refers to an obj we need to fetch.
+ WorkingSetMember* member = _workingSet.get(id);
+
+ // This must be true for somebody to request a fetch and can only change when an
+ // invalidation happens, which is when we give up a lock. Don't give up the
+ // lock between receiving the NEED_FETCH and actually fetching(?).
+ verify(member->hasLoc());
+
+ // Actually bring record into memory.
+ Record* record = member->loc.rec();
+ record->touch();
+
+ // Record should be in memory now. Log if it's not.
+ if (!Record::likelyInPhysicalMemory(record->dataNoThrowing())) {
+ OCCASIONALLY {
+ warning() << "Record wasn't in memory immediately after fetch: "
+ << member->loc.toString() << endl;
+ }
+ }
+
+ // Note that we're not freeing id. Fetch semantics say that we shouldn't.
+ }
else {
- // IS_EOF, FAILURE, NEED_YIELD. We just stop here.
+ // IS_EOF, FAILURE. We just stop here.
return false;
}
}
diff --git a/src/mongo/db/exec/skip.cpp b/src/mongo/db/exec/skip.cpp
new file mode 100644
index 00000000000..8dfb4e33df3
--- /dev/null
+++ b/src/mongo/db/exec/skip.cpp
@@ -0,0 +1,58 @@
+/**
+* Copyright (C) 2013 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "mongo/db/exec/skip.h"
+
+namespace mongo {
+
+ SkipStage::SkipStage(int toSkip, WorkingSet* ws, PlanStage* child)
+ : _ws(ws), _child(child), _toSkip(toSkip) { }
+
+ SkipStage::~SkipStage() { }
+
+ bool SkipStage::isEOF() { return _child->isEOF(); }
+
+ PlanStage::StageState SkipStage::work(WorkingSetID* out) {
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ WorkingSetID id;
+ StageState status = _child->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ // If we're still skipping results...
+ if (_toSkip > 0) {
+ // ...drop the result.
+ --_toSkip;
+ _ws->free(id);
+ return PlanStage::NEED_TIME;
+ }
+
+ *out = id;
+ return PlanStage::ADVANCED;
+ }
+ else {
+ // NEED_TIME/YIELD, ERROR, IS_EOF
+ return status;
+ }
+ }
+
+ void SkipStage::prepareToYield() { _child->prepareToYield(); }
+
+ void SkipStage::recoverFromYield() { _child->recoverFromYield(); }
+
+ void SkipStage::invalidate(const DiskLoc& dl) { _child->invalidate(dl); }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/skip.h b/src/mongo/db/exec/skip.h
new file mode 100644
index 00000000000..5086e9d66aa
--- /dev/null
+++ b/src/mongo/db/exec/skip.h
@@ -0,0 +1,51 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "mongo/db/diskloc.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/exec/plan_stage.h"
+
+namespace mongo {
+
+ /**
+ * This stage implements skip functionality. It drops the first 'toSkip' results from its child
+ * then returns the rest verbatim.
+ *
+ * Preconditions: None.
+ */
+ class SkipStage : public PlanStage {
+ public:
+ SkipStage(int toSkip, WorkingSet* ws, PlanStage* child);
+ virtual ~SkipStage();
+
+ virtual bool isEOF();
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void prepareToYield();
+ virtual void recoverFromYield();
+ virtual void invalidate(const DiskLoc& dl);
+
+ private:
+ WorkingSet* _ws;
+ scoped_ptr<PlanStage> _child;
+
+ // We drop the first _toSkip results that we would have returned.
+ int _toSkip;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp
index e15e8a96489..d55a9dd497b 100644
--- a/src/mongo/db/exec/stagedebug_cmd.cpp
+++ b/src/mongo/db/exec/stagedebug_cmd.cpp
@@ -20,7 +20,11 @@
#include "mongo/db/commands.h"
#include "mongo/db/exec/and_hash.h"
#include "mongo/db/exec/and_sorted.h"
+#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/index_scan.h"
+#include "mongo/db/exec/limit.h"
+#include "mongo/db/exec/or.h"
+#include "mongo/db/exec/skip.h"
#include "mongo/db/exec/simple_plan_runner.h"
#include "mongo/db/index/catalog_hack.h"
#include "mongo/db/jsobj.h"
@@ -49,16 +53,16 @@ namespace mongo {
*
* node -> {andHash: {filter: {filter}, args: { nodes: [node, node]}}}
* node -> {andSorted: {filter: {filter}, args: { nodes: [node, node]}}}
+ * node -> {or: {filter: {filter}, args: { dedup:bool, nodes:[node, node]}}}
+ * node -> {fetch: {filter: {filter}, args: {node: node}}}
+ * node -> {limit: {args: {node: node, num: posint}}}
+ * node -> {skip: {args: {node: node, num: posint}}}
*
* Forthcoming Nodes:
*
* node -> {cscan: {filter: {filter}, args: {name: "collectionname" }}}
- * node -> {or: {filter: {filter}, args: { dedup:bool, nodes:[node, node]}}}
- * node -> {fetch: {filter: {filter}, args: {node: node}}}
* node -> {sort: {filter: {filter}, args: {node: node, pattern: objWithSortCriterion}}}
* node -> {dedup: {filter: {filter}, args: {node: node, field: field}}}
- * node -> {limit: {filter: filter}, args: {node: node, num: posint}}
- * node -> {skip: {filter: filter}, args: {node: node, num: posint}}
* node -> {unwind: {filter: filter}, args: {node: node, field: field}}
*/
class StageDebugCmd : public Command {
@@ -203,6 +207,52 @@ namespace mongo {
return andStage.release();
}
+ else if ("or" == nodeName) {
+ uassert(16934, "Nodes argument must be provided to AND",
+ nodeArgs["nodes"].isABSONObj());
+ uassert(16935, "Dedup argument must be provided to OR",
+ !nodeArgs["dedup"].eoo());
+ BSONObjIterator it(nodeArgs["nodes"].Obj());
+ auto_ptr<OrStage> orStage(new OrStage(workingSet, nodeArgs["dedup"].Bool(),
+ matcher.release()));
+ while (it.more()) {
+ BSONElement e = it.next();
+ if (!e.isABSONObj()) { return NULL; }
+ PlanStage* subNode = parseQuery(dbname, e.Obj(), workingSet);
+ uassert(16936, "Can't parse sub-node of OR: " + e.Obj().toString(),
+ NULL != subNode);
+ // takes ownership
+ orStage->addChild(subNode);
+ }
+
+ return orStage.release();
+ }
+ else if ("fetch" == nodeName) {
+ uassert(16929, "Node argument must be provided to fetch",
+ nodeArgs["node"].isABSONObj());
+ PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet);
+ return new FetchStage(workingSet, subNode, matcher.release());
+ }
+ else if ("limit" == nodeName) {
+ uassert(16937, "Limit stage doesn't have a filter (put it on the child)",
+ NULL == matcher.get());
+ uassert(16930, "Node argument must be provided to limit",
+ nodeArgs["node"].isABSONObj());
+ uassert(16931, "Num argument must be provided to limit",
+ nodeArgs["num"].isNumber());
+ PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet);
+ return new LimitStage(nodeArgs["num"].numberInt(), workingSet, subNode);
+ }
+ else if ("skip" == nodeName) {
+ uassert(16938, "Skip stage doesn't have a filter (put it on the child)",
+ NULL == matcher.get());
+ uassert(16932, "Node argument must be provided to skip",
+ nodeArgs["node"].isABSONObj());
+ uassert(16933, "Num argument must be provided to skip",
+ nodeArgs["num"].isNumber());
+ PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet);
+ return new SkipStage(nodeArgs["num"].numberInt(), workingSet, subNode);
+ }
else {
return NULL;
}